c++98 沒有提供 線程能力 程序員必須使用平台api c++11 增加了新的線程庫 在此之前 boost 提供了 thread庫 實現了多線程 thread庫需要編譯 bjam --toolset=msvc-10.0 --with-thread --build-type=complete --prefix=D:\Boost install
thread庫 使用了 date_time庫 故使用前 還要編譯 配置 data_time庫 在linux/unix下 鏈接時還需要使用 -lpthread 來鏈接 POSIX線程庫
在線程中 使用 全局變量 或 static 局部變量 將在多線程 重入thread時 無法保證變量值的正確性(前一個thread 可能改變了此值) 線程本地存儲 tls(thread local storage) 或稱為 線程專有存儲 tss(thread specific storage) 使變量使用起來 像是線程 獨有的 thread庫 使用thread_specific_ptr 提供tss功能 其接口類似 shared_ptr
boost::mutex m;
void work_thread()
{
boost::thread_specific_ptr<int> ptr;
ptr.reset(new int());
++(*ptr);
{
boost::mutex::scoped_lock sl(m);
std::cout<<*ptr<<std::endl;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::thread_group tg;
for(int i=0;i<5;++i)
{
tg.create_thread(work_thread);
}
tg.join_all();
std::system("pause");
return 0;
}
//輸出 1 1 1 1 1
thread_specific_ptr 無參初始化為 NULL指針 其未 operator bool 判斷指針是否為空 應該 get
call_once 提供了線程只執行一次的功能 線程中 為call_once傳入 一個boost::once_flag = BOOST_ONCE_INIT的標誌 以及一個 void()簽名檔 初始化函數 thread庫將 保證 只有一個線程 執行 void() 函數
boost::once_flag flag = BOOST_ONCE_INIT;
void once_fun()
{
puts("once_fun");
boost::this_thread::sleep(boost::posix_time::seconds(5));
}
void work_thread()
{
boost::call_once(flag,once_fun);
puts("thread");
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::thread_group tg;
for(int i=0;i<5;++i)
{
tg.create_thread(work_thread);
}
tg.join_all();
std::system("pause");
return 0;
}
thread庫 保證 只有第一個線程執行 call_once 的 初始化函數 且其他線程 執行到 call_once 時 若第一個 call_once未執行完 將等待 其執行完 once_flag 不能說線程 局部變量 (每個線程局部變量 是獨立的) once_flag 不能說線程 static 局部變量(c++ 靜態變量的 初始化 不是線程安全的)
barrier 護欄 或稱為 rendezvous 約會地點 更恰當 執行 barrier 的 wait 方法 將使線程 等待 直到 其他線程 都執行到 barrier 的 wait 方法
//定義一個 3 個thread的約會
boost::barrier br(3);
boost::mutex m;
template<int N>
void work_thread()
{
{
boost::mutex::scoped_lock sl(m);
std::cout<<"this is work_htread_"<<N<<"\nwait...\n";
}
boost::this_thread::sleep(boost::posix_time::seconds(N));
//約會地點
br.wait();
{
boost::mutex::scoped_lock sl(m);
std::cout<<"this is work_htread_"<<N<<"\nend...\n";
}
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::thread_group tg;
tg.create_thread(work_thread<1>);
tg.create_thread(work_thread<2>);
tg.create_thread(work_thread<3>);
tg.join_all();
std::system("pause");
return 0;
}condition_variable condition_variable_any 一般用在 某種條件 達成時 通知 另外一個 線程 wait 可以 掛起線程 直達 得到一個 notify_one/notify_all 通知 notify_one 可以 喚醒一個 wait 線程 notify_all 可以 喚醒所有 wait 線程 condition_variable 的 wait 必須 使用 unique_lock 而 condition_variable_any 可以用在 更多 型別的 ***_lock 上 使用 condition_variable 可以很容易的 實現 生產消費 模型
#include <cassert>
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <boost/thread.hpp>
//同步對象
boost::mutex m;
//生產者 通知
boost::condition_variable cv_producer;
//消費者 通知
boost::condition_variable cv_consumer;
//產品 為0 未生產
int v = 0;
void producer()
{
while(true)
{
boost::unique_lock<boost::mutex> lock(m);
while(v)
{
//產品還未被消費掉 等待消費者消費
cv_producer.wait(lock);
}
assert(v == 0);
v = std::rand()%10 + 1;
std::cout<<"producer "<<v<<"\n";
//喚醒一個 消費者
cv_consumer.notify_one();
}
}
void consumer()
{
while(true)
{
boost::unique_lock<boost::mutex> lock(m);
while(!v)
{
//產品還未生產 等待生產者生產
cv_consumer.wait(lock);
}
assert(v != 0);
std::cout<<"consumer "<<v<<"\n";
v = 0;
//喚醒一個 生產者
cv_producer.notify_one();
}
}
int main()
{
std::srand(std::time(NULL));
//創建 生產者
boost::thread producer0(producer);
for(int i=0;i<std::rand()%5+5;++i)
{
boost::thread t(producer);
}
//創建 消費者
for(int i=0;i<std::rand()%5+5;++i)
{
boost::thread t(consumer);
}
producer0.join();
return 0;
}wait 調用時 會 unlock mutex 使用其他線程能夠 lock wait 返回時 會 重新lock mutex 使用其他線程 無法lock
所謂讀寫鎖 既鎖定讀時不可寫 鎖定寫時不可讀 然可以被 多次 讀鎖定 thread 庫的 shared_mutex 提供了讀寫鎖功能 可以把 shared_mutex 完全當 mutex 使用 然此比 mutex 代價更高
//寫鎖定 lock try_lock timed_lock //解鎖 寫鎖定 unlock //讀鎖定 lock_shared try_lock_shared timed_lock_shared //解鎖 讀鎖定 unlock_shared //RAII 對象 構造時鎖定 析構 解鎖 shared_lock<shared_mutex> //讀 unique_lock<shared_mutex> //寫
#include<boost\thread.hpp>
int _tmain(int argc, _TCHAR* argv[])
{
boost::shared_mutex sm;
{
//寫鎖定
boost::unique_lock<boost::shared_mutex> wm(sm);
assert(!sm.try_lock());
assert(!sm.try_lock_shared());
}
{
//多次讀鎖定
boost::shared_lock<boost::shared_mutex> rm1(sm);
boost::shared_lock<boost::shared_mutex> rm2(sm);
assert(sm.try_lock_shared());
sm.unlock_shared();
assert(!sm.try_lock());
}
boost::unique_lock<boost::shared_mutex> wm(sm);
std::system("pause");
return 0;
}boost提供了如下 用於同步到 互斥量 mutex //獨占互斥量 try_mutex //同上 不過為兼容舊版本的 typedef timed_mutex //同上 不過提供了 超時 鎖定功能 recursive_mutex //同mutex 不過可以被多次鎖定(遞歸鎖定) 當然需要對應次數 解鎖 recursive_try_mutex //同上 不過為兼容就版本的 typedef recursive_timed_mutex //同上 不過提供超時鎖定功能 shared_mutex //讀寫鎖(或稱為 共享互斥量)
//鎖定互斥量 void lock() //解鎖互斥量 void unlock() //嘗試鎖定互斥量 成功返回 true 否則false bool try_lock() //傳入時間點 或 時間段 作為超時時間 鎖定互斥量 成功返回 true 超時 false bool timed_lock(...) //RAII 類對象 構造時 lock 互斥量 析構 unlock 互斥量 class boost::mutex::scoped_lock
#include<boost\thread.hpp>
int _tmain(int argc, _TCHAR* argv[])
{
boost::mutex m;
{
boost::mutex::scoped_lock lock(m);
assert(!m.try_lock());
}
assert(m.try_lock());
m.unlock();
std::system("pause");
return 0;
}在 win32 下 調用 mutex::unlock() 如果 mutex 處於 unlock 狀態 mutex 將會被 lock (此顯然是一個 bug version 1.55 and 1.61 確認)
thread_group 使用 std::list<thread*> 容納thread對象 使用其 接口join_all interrupt_all 可以對所有線程 進行 join interrupt 管理
//將一個 thread 加入 thread_group void add_thread(thread* thrd) //新啟一個線程 加入 thread_group 並返回其 thread* thread* create_thread(F threadfunc) //將一個 thread 從 thread_group中移除 不會delete void remove_thread(thread* thrd)
#include<boost\thread.hpp>
void work_fun(const int n)
{
try
{
while(true)
{
boost::this_thread::sleep(boost::posix_time::seconds(n));
}
}
catch(boost::thread_interrupted&)
{
std::cout<<"thread is end"<<std::endl;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::function<void()> f = boost::bind(work_fun,1);
boost::thread_group g;
for(unsigned int i=0;i<boost::thread::hardware_concurrency();++i)
{
g.create_thread(f);
}
boost::this_thread::sleep(boost::posix_time::seconds(2));
g.interrupt_all();
g.join_all();
std::system("pause");
return 0;
}thread_group 析構時 會自動delete所有 thread* 然remove_thread 移除的 thread× 不會被delete 需要自己 delete
線程中斷時個麻煩的事情 boost 提供了很好的解決方案 既中斷 interrupt 調用thread的 interrupt 將引發一個斷點 工作線程 try 捕獲此異常 便可知道 應該中斷 interrupt要使 線程拋出異常 線程函數中 必須有中斷點 (調用 interrupt後 線程執行到中斷的 才會拋出異常) 一下是 boost 預定義的中斷的 thread::join thread::time_join condition_variable::wait condition_variable::timed_wait condition_variable_any::wait condition_variable_any::timed_wait thread::sleep this_thread::sleep this_thread::interruption_point 以上皆是某種等待函數 只有interruption_point唯一的作用就是標識一個 中斷點
thread默認啟用中斷 然boost 以提供了接口 以控制線程 是否可被 interrupt 中斷 那麼誰怕測 boost::this_thread //返回 當前線程是否可被 中斷 bool interruption_enabled() //RAII類對象 構造關閉線程中斷功能 析構啟用線程中斷功能 class disable_interruption //RAII類對象 只能在 disable_interruption 生命期使用 構造啟用中斷功能 析構關閉中斷功能 class restore_interruption
#include<boost\thread.hpp>
void work_fun(const int n)
{
try
{
while(true)
{
boost::this_thread::sleep(boost::posix_time::seconds(n));
}
}
catch(boost::thread_interrupted&)
{
std::cout<<"thread is end"<<std::endl;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::function<void()> f = boost::bind(work_fun,5);
boost::thread t(f);
boost::this_thread::sleep(boost::posix_time::seconds(2));
t.interrupt();
t.join();
std::system("pause");
return 0;
}interrupt 只會通知線程 應該拋出中斷異常 而不會等待線程 結束 若需要等待線程 執行結束 join
#include <boost/thread.hpp> class thread 是thread庫的 核心實現 其負責創建 管理線程 thread 接受一個functor作為 構造參數 同時可傳入 9個參數 以供線程執行functor時 傳遞給 functor thread保存 functor 9個參數 的拷貝 若需要引用 使用 ref庫 thread一旦 構造 線程立刻執行 boost::thread不提供 start begin ... 等接口
//返回 thread是否 標識了一個 可執行線程(線程執行結束 或 detach 返回false) bool joinable() const //等待線程執行結束 void join(); timed_join //同上 不過接受一個 時間段 作為等待超時 //將thread與線程執行體分離 之後 thread不能再管理線程 然線程會繼續執行 void detach(); //返回線程id對象 支持copy compare id get_id() //放棄執行時間片 static void yield() //睡眠 傳入時間點 static inline void sleep(const system_time& xt) //返回cpu數 static unsigned hardware_concurrency() boost::this_thread 提供了三個函數 用於操作當前線程 sleep //可以是時間段 或 時間點 yield get_id
#include <boost/thread.hpp>
void work_fun(const int n)
{
for(int i=0;i<n;++i)
{
std::cout<<i<<"\n";
}
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::thread t(work_fun,5);
t.timed_join(boost::posix_time::seconds(1));
t.join();
std::system("pause");
return 0;
}