3:mutex/atomic
1:threadで示したコードを少しばかりいじくります。lo以上hi未満の素数を勘定するcoutn_primeをばっさり削り、lambda内で直接やらせます。
void multi(int M, int nthr) { vector<thread> thr(nthr); div_range<> rng(2,M,nthr); int result = 0; chrono::system_clock::time_point start = chrono::system_clock::now(); for ( int t = 0; t < nthr; ++t ) { thr[t] = thread([&](int lo, int hi) { for ( int n = lo; n < hi; ++n ) { // nが素数なら resultに1を加える if ( is_prime(n) ) { ++result; } } }, rng.lo(t), rng.hi(t)); } for ( thread& th : thr ) { th.join(); } chrono::duration<double> sec = chrono::system_clock::now() - start; cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl; }
うまいこと動いてくれそう...ですがコレ大きな穴が空いています。nが素数の時に++resultしてますが、++resultはresultを読んで「1を加えて/書き戻す処理」が行われます。読んでから書き戻すまでの間に他のスレッドが割り込むとresultの結果が狂ってしまう。data-race(データ競合)と呼ばれ、きわどいタイミングで発生するためになかなか再現しない厄介なバグです。こんなときに「こっからここまで、他のスレッドは入ってくるな(外で待ってろ)!」を実現するのがmutex(mutual exclusion:相互排他)。mutexをlock()してからunlock()までの間、他のスレッドはlock()地点でブロックされます。上の例でちゃんと動かすには、
... mutex mtx; int result = 0; ... // nが素数なら resultに1を加える if ( is_prime(n) ) { mtx.lock(); ++result; mtx.unlock(); } ...
あるいは
... mutex mtx; int result = 0; ... // nが素数なら resultに1を加える if ( is_prime(n) ) { lock_guard<mutex> guard(mtx); ++result; } ...
lock_guardはコンストラクト時にlock/デストラクト時にunlock()してくれるのでunlock()忘れがなくてお手軽です。途中で例外が発生しても確実にunlock()してくれますし。
また、int,longなどのビルトイン型およびポインタ型に対して++,--,+=,-=,&=,|=などの演算を行う(極めて短い)間だけ、他スレッドの割り込みを抑止する際は高速/軽量なatomicがオススメです。上の例であれば、
... atomic<int> result = 0; ... // nが素数なら resultに1を加える if ( is_prime(n) ) { ++result; } ...
でOK。
4:condition_variable
condition_variableを使って、あるスレッドから他のスレッドへイベントを投げることができます。イベントを待ち受けるスレッドでは、
mutex mtx; condition_variable cv; unique_lock<mutex> lock(mtx); ... cv.wait(lock); ...
condition_bariablでwait()するとlockされたmutexをいったん解いて(unlockして)待ち状態となります。イベントを送出するスレッドでは、
unique_lock<mutex> lock(mtx); ... cv.notify_all(); ...
condition_bariablにnotify_all(またはnotify_one)すると待ち受け側のwait()が解けると同時にmutexが再度lockされるというカラクリ。
これを使って素数を勘定している全スレッドの完了を待ち受けてみます。
void multi(int M, int nthr) { vector<thread> thr(nthr); div_range<> rng(2,M,nthr); condition_variable cond; int finished = 0; atomic<int> result = 0; mutex mtx; chrono::system_clock::time_point start = chrono::system_clock::now(); for ( int t = 0; t < nthr; ++t ) { thr[t] = thread([&](int lo, int hi) { for ( int n = lo; n < hi; ++n ) { if ( is_prime(n) ) ++result; } lock_guard<mutex> guard(mtx); ++finished; cond.notify_one(); }, rng.lo(t), rng.hi(t)); } unique_lock<mutex> lock(mtx); // 全スレッドが++finishedすることでfinished==nthrとなるのを待つ cond.wait(lock, [&]() { return finished == nthr;}); chrono::duration<double> sec = chrono::system_clock::now() - start; cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl; for ( thread& th : thr ) { th.join(); } }
condition_variableを使った例をもうひとつ。ランデブー(rendezvous)あるいはバリア(barrier)と呼ばれる「待ち合わせ」のからくり。
class rendezvous { public: rendezvous(unsigned int count) : threshold_(count), count_(count), generation_(0) { if (count == 0) { throw std::invalid_argument("count cannot be zero."); } } bool wait() { std::unique_lock<std::mutex> lock(mutex_); unsigned int gen = generation_; if ( --count_ == 0) { generation_++; count_ = threshold_; condition_.notify_all(); return true; } condition_.wait(lock, [&](){return gen != generation_;}); return false; } private: std::mutex mutex_; std::condition_variable condition_; unsigned int threshold_; unsigned int count_; unsigned int generation_; };
rendezvous r(5);のように、待ち合わせる人数(スレッド数)を引数としてコンストラクトしておきます。各スレッドがr.wait()すると全員が揃うまで待ち状態となり、最後のスレッドがr.wait()した途端全員のブロックが解けて一斉に動き出します。
先ほどの全スレッドの完了待ちをrendezvousで実装すると、
void multi(int M, int nthr) { vector<thread> thr(nthr); div_range<> rng(2,M,nthr); atomic<int> result = 0; rendezvous quit(nthr+1); chrono::system_clock::time_point start = chrono::system_clock::now(); for ( int t = 0; t < nthr; ++t ) { thr[t] = thread([&](int lo, int hi) { for ( int n = lo; n < hi; ++n ) { if ( is_prime(n) ) ++result; } quit.wait(); }, rng.lo(t), rng.hi(t)); } quit.wait(); // 全スレッドが wait するまで待つ chrono::duration<double> sec = chrono::system_clock::now() - start; cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl; for ( thread& th : thr ) { th.join(); } }
C++11が提供するスレッド・ライブラリをざっくりと紹介しました。いかがでしょう、マルチスレッド・アプリケーションがずっとお手軽に書けるようになりました。CPUメーターが全コア振り切るHigh Performance Computimgのカイカンをお楽しみください。