CodeZine(コードジン)

特集ページ一覧

C++11 : スレッド・ライブラリひとめぐり

Visual Studio 2012 RC であそんでみたよ

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2012/06/29 14:00
目次

3:mutex/atomic

 1:threadで示したコードを少しばかりいじくります。lo以上hi未満の素数を勘定するcoutn_primeをばっさり削り、lambda内で直接やらせます。

リスト9
void multi(int M, int nthr) {
  vector<thread> thr(nthr);
  div_range<> rng(2,M,nthr);

  int result = 0;

  chrono::system_clock::time_point start = chrono::system_clock::now();
  for ( int t = 0; t < nthr; ++t ) {
    thr[t] = thread([&](int lo, int hi) {
                      for ( int n = lo; n < hi; ++n ) {
                        // nが素数なら resultに1を加える
                        if ( is_prime(n) ) {
                          ++result;
                        }
                      }
                    }, 
                    rng.lo(t), rng.hi(t));
  }
  for ( thread& th : thr ) { th.join(); }
  chrono::duration<double> sec = chrono::system_clock::now() - start;

  cout << result << ' ' << sec.count() << "[sec] : " << nthr <<  endl;
}

 うまいこと動いてくれそう...ですがコレ大きな穴が空いています。nが素数の時に++resultしてますが、++resultはresultを読んで「1を加えて/書き戻す処理」が行われます。読んでから書き戻すまでの間に他のスレッドが割り込むとresultの結果が狂ってしまう。data-race(データ競合)と呼ばれ、きわどいタイミングで発生するためになかなか再現しない厄介なバグです。こんなときに「こっからここまで、他のスレッドは入ってくるな(外で待ってろ)!」を実現するのがmutex(mutual exclusion:相互排他)。mutexをlock()してからunlock()までの間、他のスレッドはlock()地点でブロックされます。上の例でちゃんと動かすには、

リスト10
  ...
  mutex mtx;
  int result = 0;
  ...
     // nが素数なら resultに1を加える
     if ( is_prime(n) ) {
       mtx.lock();
       ++result;
       mtx.unlock();
     }
  ...

 あるいは

リスト11
  ...
  mutex mtx;
  int result = 0;
  ...
     // nが素数なら resultに1を加える
     if ( is_prime(n) ) {
       lock_guard<mutex> guard(mtx);
       ++result;
     }
  ...

 lock_guardはコンストラクト時にlock/デストラクト時にunlock()してくれるのでunlock()忘れがなくてお手軽です。途中で例外が発生しても確実にunlock()してくれますし。

 また、int,longなどのビルトイン型およびポインタ型に対して++,--,+=,-=,&=,|=などの演算を行う(極めて短い)間だけ、他スレッドの割り込みを抑止する際は高速/軽量なatomicがオススメです。上の例であれば、

リスト12
  ...
  atomic<int> result = 0;
  ...
     // nが素数なら resultに1を加える
     if ( is_prime(n) ) {
       ++result;
     }
  ...

 でOK。

4:condition_variable

 condition_variableを使って、あるスレッドから他のスレッドへイベントを投げることができます。イベントを待ち受けるスレッドでは、

リスト13
  mutex mtx;
  condition_variable cv;

  unique_lock<mutex> lock(mtx);
  ...
  cv.wait(lock);
  ...

 condition_bariablでwait()するとlockされたmutexをいったん解いて(unlockして)待ち状態となります。イベントを送出するスレッドでは、

リスト14
  unique_lock<mutex> lock(mtx);
  ...
  cv.notify_all();
  ...

 condition_bariablにnotify_all(またはnotify_one)すると待ち受け側のwait()が解けると同時にmutexが再度lockされるというカラクリ。

 これを使って素数を勘定している全スレッドの完了を待ち受けてみます。

リスト15
void multi(int M, int nthr) {
  vector<thread> thr(nthr);
  div_range<> rng(2,M,nthr);
  condition_variable cond;
  int finished = 0;
  atomic<int> result = 0;
  mutex mtx;

  chrono::system_clock::time_point start = chrono::system_clock::now();
  for ( int t = 0; t < nthr; ++t ) {
    thr[t] = thread([&](int lo, int hi) { 
                      for ( int n = lo; n < hi; ++n ) {
                        if ( is_prime(n) ) ++result; 
                      }
                      lock_guard<mutex> guard(mtx);
                      ++finished;
                      cond.notify_one();
                    },
                    rng.lo(t), rng.hi(t));
  }
  unique_lock<mutex> lock(mtx);
  // 全スレッドが++finishedすることでfinished==nthrとなるのを待つ
  cond.wait(lock, [&]() { return finished == nthr;});
  chrono::duration<double> sec = chrono::system_clock::now() - start;

  cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  for ( thread& th : thr ) { th.join(); }

}

 condition_variableを使った例をもうひとつ。ランデブー(rendezvous)あるいはバリア(barrier)と呼ばれる「待ち合わせ」のからくり。

リスト16
class rendezvous {
public:
  rendezvous(unsigned int count) 
    : threshold_(count), count_(count), generation_(0) {
    if (count == 0) { throw std::invalid_argument("count cannot be zero."); }
  }

  bool wait() {
    std::unique_lock<std::mutex> lock(mutex_);
    unsigned int gen = generation_;
    if ( --count_ == 0) {
      generation_++;
      count_ = threshold_;
      condition_.notify_all();
      return true;
    }
    condition_.wait(lock, [&](){return gen != generation_;});
    return false;
  }

private:
  std::mutex mutex_;
  std::condition_variable condition_;
  unsigned int threshold_;
  unsigned int count_;
  unsigned int generation_;
};

 rendezvous r(5);のように、待ち合わせる人数(スレッド数)を引数としてコンストラクトしておきます。各スレッドがr.wait()すると全員が揃うまで待ち状態となり、最後のスレッドがr.wait()した途端全員のブロックが解けて一斉に動き出します。

 先ほどの全スレッドの完了待ちをrendezvousで実装すると、

リスト17
void multi(int M, int nthr) {
  vector<thread> thr(nthr);
  div_range<> rng(2,M,nthr);
  atomic<int> result = 0;
  rendezvous quit(nthr+1);

  chrono::system_clock::time_point start = chrono::system_clock::now();
  for ( int t = 0; t < nthr; ++t ) {
    thr[t] = thread([&](int lo, int hi) { 
                      for ( int n = lo; n < hi; ++n ) {
                        if ( is_prime(n) ) ++result; 
                      }
                      quit.wait();
                    },
                    rng.lo(t), rng.hi(t));
  }
  quit.wait(); // 全スレッドが wait するまで待つ
  chrono::duration<double> sec = chrono::system_clock::now() - start;

  cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  for ( thread& th : thr ) { th.join(); }

}

 C++11が提供するスレッド・ライブラリをざっくりと紹介しました。いかがでしょう、マルチスレッド・アプリケーションがずっとお手軽に書けるようになりました。CPUメーターが全コア振り切るHigh Performance Computimgのカイカンをお楽しみください。

  • LINEで送る
  • このエントリーをはてなブックマークに追加

著者プロフィール

  • επιστημη(エピステーメー)

    C++に首まで浸かったプログラマ。 Microsoft MVP, Visual C++ (2004.01~2018.06) "だった"り わんくま同盟でたまにセッションスピーカやったり 中国茶淹れてにわか茶人を気取ってたり、 あと Facebook とか。 著書: - STL標準...

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5