c++98 沒有提供 線程能力 程序員必須使用平台api c++11 增加了新的線程庫 在此之前 boost 提供了 thread庫 實現了多線程 thread庫需要編譯 bjam --toolset=msvc-10.0 --with-thread --build-type=complete --prefix=D:\Boost install
thread庫 使用了 date_time庫 故使用前 還要編譯 配置 data_time庫 在linux/unix下 鏈接時還需要使用 -lpthread 來鏈接 POSIX線程庫
在線程中 使用 全局變量 或 static 局部變量 將在多線程 重入thread時 無法保證變量值的正確性(前一個thread 可能改變了此值) 線程本地存儲 tls(thread local storage) 或稱為 線程專有存儲 tss(thread specific storage) 使變量使用起來 像是線程 獨有的 thread庫 使用thread_specific_ptr 提供tss功能 其接口類似 shared_ptr
boost::mutex m; void work_thread() { boost::thread_specific_ptr<int> ptr; ptr.reset(new int()); ++(*ptr); { boost::mutex::scoped_lock sl(m); std::cout<<*ptr<<std::endl; } } int _tmain(int argc, _TCHAR* argv[]) { boost::thread_group tg; for(int i=0;i<5;++i) { tg.create_thread(work_thread); } tg.join_all(); std::system("pause"); return 0; }
//輸出 1 1 1 1 1
thread_specific_ptr 無參初始化為 NULL指針 其未 operator bool 判斷指針是否為空 應該 get
call_once 提供了線程只執行一次的功能 線程中 為call_once傳入 一個boost::once_flag = BOOST_ONCE_INIT的標誌 以及一個 void()簽名檔 初始化函數 thread庫將 保證 只有一個線程 執行 void() 函數
boost::once_flag flag = BOOST_ONCE_INIT; void once_fun() { puts("once_fun"); boost::this_thread::sleep(boost::posix_time::seconds(5)); } void work_thread() { boost::call_once(flag,once_fun); puts("thread"); } int _tmain(int argc, _TCHAR* argv[]) { boost::thread_group tg; for(int i=0;i<5;++i) { tg.create_thread(work_thread); } tg.join_all(); std::system("pause"); return 0; }
thread庫 保證 只有第一個線程執行 call_once 的 初始化函數 且其他線程 執行到 call_once 時 若第一個 call_once未執行完 將等待 其執行完 once_flag 不能說線程 局部變量 (每個線程局部變量 是獨立的) once_flag 不能說線程 static 局部變量(c++ 靜態變量的 初始化 不是線程安全的)
barrier 護欄 或稱為 rendezvous 約會地點 更恰當 執行 barrier 的 wait 方法 將使線程 等待 直到 其他線程 都執行到 barrier 的 wait 方法
//定義一個 3 個thread的約會 boost::barrier br(3); boost::mutex m; template<int N> void work_thread() { { boost::mutex::scoped_lock sl(m); std::cout<<"this is work_htread_"<<N<<"\nwait...\n"; } boost::this_thread::sleep(boost::posix_time::seconds(N)); //約會地點 br.wait(); { boost::mutex::scoped_lock sl(m); std::cout<<"this is work_htread_"<<N<<"\nend...\n"; } } int _tmain(int argc, _TCHAR* argv[]) { boost::thread_group tg; tg.create_thread(work_thread<1>); tg.create_thread(work_thread<2>); tg.create_thread(work_thread<3>); tg.join_all(); std::system("pause"); return 0; }
condition_variable condition_variable_any 一般用在 某種條件 達成時 通知 另外一個 線程 wait 可以 掛起線程 直達 得到一個 notify_one/notify_all 通知 notify_one 可以 喚醒一個 wait 線程 notify_all 可以 喚醒所有 wait 線程 condition_variable 的 wait 必須 使用 unique_lock 而 condition_variable_any 可以用在 更多 型別的 ***_lock 上 使用 condition_variable 可以很容易的 實現 生產消費 模型
#include <cassert> #include <iostream> #include <ctime> #include <cstdlib> #include <boost/thread.hpp> //同步對象 boost::mutex m; //生產者 通知 boost::condition_variable cv_producer; //消費者 通知 boost::condition_variable cv_consumer; //產品 為0 未生產 int v = 0; void producer() { while(true) { boost::unique_lock<boost::mutex> lock(m); while(v) { //產品還未被消費掉 等待消費者消費 cv_producer.wait(lock); } assert(v == 0); v = std::rand()%10 + 1; std::cout<<"producer "<<v<<"\n"; //喚醒一個 消費者 cv_consumer.notify_one(); } } void consumer() { while(true) { boost::unique_lock<boost::mutex> lock(m); while(!v) { //產品還未生產 等待生產者生產 cv_consumer.wait(lock); } assert(v != 0); std::cout<<"consumer "<<v<<"\n"; v = 0; //喚醒一個 生產者 cv_producer.notify_one(); } } int main() { std::srand(std::time(NULL)); //創建 生產者 boost::thread producer0(producer); for(int i=0;i<std::rand()%5+5;++i) { boost::thread t(producer); } //創建 消費者 for(int i=0;i<std::rand()%5+5;++i) { boost::thread t(consumer); } producer0.join(); return 0; }
wait 調用時 會 unlock mutex 使用其他線程能夠 lock wait 返回時 會 重新lock mutex 使用其他線程 無法lock
所謂讀寫鎖 既鎖定讀時不可寫 鎖定寫時不可讀 然可以被 多次 讀鎖定 thread 庫的 shared_mutex 提供了讀寫鎖功能 可以把 shared_mutex 完全當 mutex 使用 然此比 mutex 代價更高
//寫鎖定 lock try_lock timed_lock //解鎖 寫鎖定 unlock //讀鎖定 lock_shared try_lock_shared timed_lock_shared //解鎖 讀鎖定 unlock_shared //RAII 對象 構造時鎖定 析構 解鎖 shared_lock<shared_mutex> //讀 unique_lock<shared_mutex> //寫
#include<boost\thread.hpp> int _tmain(int argc, _TCHAR* argv[]) { boost::shared_mutex sm; { //寫鎖定 boost::unique_lock<boost::shared_mutex> wm(sm); assert(!sm.try_lock()); assert(!sm.try_lock_shared()); } { //多次讀鎖定 boost::shared_lock<boost::shared_mutex> rm1(sm); boost::shared_lock<boost::shared_mutex> rm2(sm); assert(sm.try_lock_shared()); sm.unlock_shared(); assert(!sm.try_lock()); } boost::unique_lock<boost::shared_mutex> wm(sm); std::system("pause"); return 0; }
boost提供了如下 用於同步到 互斥量 mutex //獨占互斥量 try_mutex //同上 不過為兼容舊版本的 typedef timed_mutex //同上 不過提供了 超時 鎖定功能 recursive_mutex //同mutex 不過可以被多次鎖定(遞歸鎖定) 當然需要對應次數 解鎖 recursive_try_mutex //同上 不過為兼容就版本的 typedef recursive_timed_mutex //同上 不過提供超時鎖定功能 shared_mutex //讀寫鎖(或稱為 共享互斥量)
//鎖定互斥量 void lock() //解鎖互斥量 void unlock() //嘗試鎖定互斥量 成功返回 true 否則false bool try_lock() //傳入時間點 或 時間段 作為超時時間 鎖定互斥量 成功返回 true 超時 false bool timed_lock(...) //RAII 類對象 構造時 lock 互斥量 析構 unlock 互斥量 class boost::mutex::scoped_lock
#include<boost\thread.hpp> int _tmain(int argc, _TCHAR* argv[]) { boost::mutex m; { boost::mutex::scoped_lock lock(m); assert(!m.try_lock()); } assert(m.try_lock()); m.unlock(); std::system("pause"); return 0; }
在 win32 下 調用 mutex::unlock() 如果 mutex 處於 unlock 狀態 mutex 將會被 lock (此顯然是一個 bug version 1.55 and 1.61 確認)
thread_group 使用 std::list<thread*> 容納thread對象 使用其 接口join_all interrupt_all 可以對所有線程 進行 join interrupt 管理
//將一個 thread 加入 thread_group void add_thread(thread* thrd) //新啟一個線程 加入 thread_group 並返回其 thread* thread* create_thread(F threadfunc) //將一個 thread 從 thread_group中移除 不會delete void remove_thread(thread* thrd)
#include<boost\thread.hpp> void work_fun(const int n) { try { while(true) { boost::this_thread::sleep(boost::posix_time::seconds(n)); } } catch(boost::thread_interrupted&) { std::cout<<"thread is end"<<std::endl; } } int _tmain(int argc, _TCHAR* argv[]) { boost::function<void()> f = boost::bind(work_fun,1); boost::thread_group g; for(unsigned int i=0;i<boost::thread::hardware_concurrency();++i) { g.create_thread(f); } boost::this_thread::sleep(boost::posix_time::seconds(2)); g.interrupt_all(); g.join_all(); std::system("pause"); return 0; }
thread_group 析構時 會自動delete所有 thread* 然remove_thread 移除的 thread× 不會被delete 需要自己 delete
線程中斷時個麻煩的事情 boost 提供了很好的解決方案 既中斷 interrupt 調用thread的 interrupt 將引發一個斷點 工作線程 try 捕獲此異常 便可知道 應該中斷 interrupt要使 線程拋出異常 線程函數中 必須有中斷點 (調用 interrupt後 線程執行到中斷的 才會拋出異常) 一下是 boost 預定義的中斷的 thread::join thread::time_join condition_variable::wait condition_variable::timed_wait condition_variable_any::wait condition_variable_any::timed_wait thread::sleep this_thread::sleep this_thread::interruption_point 以上皆是某種等待函數 只有interruption_point唯一的作用就是標識一個 中斷點
thread默認啟用中斷 然boost 以提供了接口 以控制線程 是否可被 interrupt 中斷 那麼誰怕測 boost::this_thread //返回 當前線程是否可被 中斷 bool interruption_enabled() //RAII類對象 構造關閉線程中斷功能 析構啟用線程中斷功能 class disable_interruption //RAII類對象 只能在 disable_interruption 生命期使用 構造啟用中斷功能 析構關閉中斷功能 class restore_interruption
#include<boost\thread.hpp> void work_fun(const int n) { try { while(true) { boost::this_thread::sleep(boost::posix_time::seconds(n)); } } catch(boost::thread_interrupted&) { std::cout<<"thread is end"<<std::endl; } } int _tmain(int argc, _TCHAR* argv[]) { boost::function<void()> f = boost::bind(work_fun,5); boost::thread t(f); boost::this_thread::sleep(boost::posix_time::seconds(2)); t.interrupt(); t.join(); std::system("pause"); return 0; }
interrupt 只會通知線程 應該拋出中斷異常 而不會等待線程 結束 若需要等待線程 執行結束 join
#include <boost/thread.hpp> class thread 是thread庫的 核心實現 其負責創建 管理線程 thread 接受一個functor作為 構造參數 同時可傳入 9個參數 以供線程執行functor時 傳遞給 functor thread保存 functor 9個參數 的拷貝 若需要引用 使用 ref庫 thread一旦 構造 線程立刻執行 boost::thread不提供 start begin ... 等接口
//返回 thread是否 標識了一個 可執行線程(線程執行結束 或 detach 返回false) bool joinable() const //等待線程執行結束 void join(); timed_join //同上 不過接受一個 時間段 作為等待超時 //將thread與線程執行體分離 之後 thread不能再管理線程 然線程會繼續執行 void detach(); //返回線程id對象 支持copy compare id get_id() //放棄執行時間片 static void yield() //睡眠 傳入時間點 static inline void sleep(const system_time& xt) //返回cpu數 static unsigned hardware_concurrency() boost::this_thread 提供了三個函數 用於操作當前線程 sleep //可以是時間段 或 時間點 yield get_id
#include <boost/thread.hpp> void work_fun(const int n) { for(int i=0;i<n;++i) { std::cout<<i<<"\n"; } } int _tmain(int argc, _TCHAR* argv[]) { boost::thread t(work_fun,5); t.timed_join(boost::posix_time::seconds(1)); t.join(); std::system("pause"); return 0; }