SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

特集記事

C++11 : スレッド・ライブラリひとめぐり

Visual Studio 2012 RC であそんでみたよ

  • X ポスト
  • このエントリーをはてなブックマークに追加

3:mutex/atomic

 1:threadで示したコードを少しばかりいじくります。lo以上hi未満の素数を勘定するcoutn_primeをばっさり削り、lambda内で直接やらせます。

リスト9
void multi(int M, int nthr) {
  vector<thread> thr(nthr);
  div_range<> rng(2,M,nthr);

  int result = 0;

  chrono::system_clock::time_point start = chrono::system_clock::now();
  for ( int t = 0; t < nthr; ++t ) {
    thr[t] = thread([&](int lo, int hi) {
                      for ( int n = lo; n < hi; ++n ) {
                        // nが素数なら resultに1を加える
                        if ( is_prime(n) ) {
                          ++result;
                        }
                      }
                    }, 
                    rng.lo(t), rng.hi(t));
  }
  for ( thread& th : thr ) { th.join(); }
  chrono::duration<double> sec = chrono::system_clock::now() - start;

  cout << result << ' ' << sec.count() << "[sec] : " << nthr <<  endl;
}

 うまいこと動いてくれそう...ですがコレ大きな穴が空いています。nが素数の時に++resultしてますが、++resultはresultを読んで「1を加えて/書き戻す処理」が行われます。読んでから書き戻すまでの間に他のスレッドが割り込むとresultの結果が狂ってしまう。data-race(データ競合)と呼ばれ、きわどいタイミングで発生するためになかなか再現しない厄介なバグです。こんなときに「こっからここまで、他のスレッドは入ってくるな(外で待ってろ)!」を実現するのがmutex(mutual exclusion:相互排他)。mutexをlock()してからunlock()までの間、他のスレッドはlock()地点でブロックされます。上の例でちゃんと動かすには、

リスト10
  ...
  mutex mtx;
  int result = 0;
  ...
     // nが素数なら resultに1を加える
     if ( is_prime(n) ) {
       mtx.lock();
       ++result;
       mtx.unlock();
     }
  ...

 あるいは

リスト11
  ...
  mutex mtx;
  int result = 0;
  ...
     // nが素数なら resultに1を加える
     if ( is_prime(n) ) {
       lock_guard<mutex> guard(mtx);
       ++result;
     }
  ...

 lock_guardはコンストラクト時にlock/デストラクト時にunlock()してくれるのでunlock()忘れがなくてお手軽です。途中で例外が発生しても確実にunlock()してくれますし。

 また、int,longなどのビルトイン型およびポインタ型に対して++,--,+=,-=,&=,|=などの演算を行う(極めて短い)間だけ、他スレッドの割り込みを抑止する際は高速/軽量なatomicがオススメです。上の例であれば、

リスト12
  ...
  atomic<int> result = 0;
  ...
     // nが素数なら resultに1を加える
     if ( is_prime(n) ) {
       ++result;
     }
  ...

 でOK。

4:condition_variable

 condition_variableを使って、あるスレッドから他のスレッドへイベントを投げることができます。イベントを待ち受けるスレッドでは、

リスト13
  mutex mtx;
  condition_variable cv;

  unique_lock<mutex> lock(mtx);
  ...
  cv.wait(lock);
  ...

 condition_bariablでwait()するとlockされたmutexをいったん解いて(unlockして)待ち状態となります。イベントを送出するスレッドでは、

リスト14
  unique_lock<mutex> lock(mtx);
  ...
  cv.notify_all();
  ...

 condition_bariablにnotify_all(またはnotify_one)すると待ち受け側のwait()が解けると同時にmutexが再度lockされるというカラクリ。

 これを使って素数を勘定している全スレッドの完了を待ち受けてみます。

リスト15
void multi(int M, int nthr) {
  vector<thread> thr(nthr);
  div_range<> rng(2,M,nthr);
  condition_variable cond;
  int finished = 0;
  atomic<int> result = 0;
  mutex mtx;

  chrono::system_clock::time_point start = chrono::system_clock::now();
  for ( int t = 0; t < nthr; ++t ) {
    thr[t] = thread([&](int lo, int hi) { 
                      for ( int n = lo; n < hi; ++n ) {
                        if ( is_prime(n) ) ++result; 
                      }
                      lock_guard<mutex> guard(mtx);
                      ++finished;
                      cond.notify_one();
                    },
                    rng.lo(t), rng.hi(t));
  }
  unique_lock<mutex> lock(mtx);
  // 全スレッドが++finishedすることでfinished==nthrとなるのを待つ
  cond.wait(lock, [&]() { return finished == nthr;});
  chrono::duration<double> sec = chrono::system_clock::now() - start;

  cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  for ( thread& th : thr ) { th.join(); }

}

 condition_variableを使った例をもうひとつ。ランデブー(rendezvous)あるいはバリア(barrier)と呼ばれる「待ち合わせ」のからくり。

リスト16
class rendezvous {
public:
  rendezvous(unsigned int count) 
    : threshold_(count), count_(count), generation_(0) {
    if (count == 0) { throw std::invalid_argument("count cannot be zero."); }
  }

  bool wait() {
    std::unique_lock<std::mutex> lock(mutex_);
    unsigned int gen = generation_;
    if ( --count_ == 0) {
      generation_++;
      count_ = threshold_;
      condition_.notify_all();
      return true;
    }
    condition_.wait(lock, [&](){return gen != generation_;});
    return false;
  }

private:
  std::mutex mutex_;
  std::condition_variable condition_;
  unsigned int threshold_;
  unsigned int count_;
  unsigned int generation_;
};

 rendezvous r(5);のように、待ち合わせる人数(スレッド数)を引数としてコンストラクトしておきます。各スレッドがr.wait()すると全員が揃うまで待ち状態となり、最後のスレッドがr.wait()した途端全員のブロックが解けて一斉に動き出します。

 先ほどの全スレッドの完了待ちをrendezvousで実装すると、

リスト17
void multi(int M, int nthr) {
  vector<thread> thr(nthr);
  div_range<> rng(2,M,nthr);
  atomic<int> result = 0;
  rendezvous quit(nthr+1);

  chrono::system_clock::time_point start = chrono::system_clock::now();
  for ( int t = 0; t < nthr; ++t ) {
    thr[t] = thread([&](int lo, int hi) { 
                      for ( int n = lo; n < hi; ++n ) {
                        if ( is_prime(n) ) ++result; 
                      }
                      quit.wait();
                    },
                    rng.lo(t), rng.hi(t));
  }
  quit.wait(); // 全スレッドが wait するまで待つ
  chrono::duration<double> sec = chrono::system_clock::now() - start;

  cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  for ( thread& th : thr ) { th.join(); }

}

 C++11が提供するスレッド・ライブラリをざっくりと紹介しました。いかがでしょう、マルチスレッド・アプリケーションがずっとお手軽に書けるようになりました。CPUメーターが全コア振り切るHigh Performance Computimgのカイカンをお楽しみください。

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
特集記事連載記事一覧

もっと読む

この記事の著者

επιστημη(エピステーメー)

C++に首まで浸かったプログラマ。Microsoft MVP, Visual C++ (2004.01~2018.06) "だった"りわんくま同盟でたまにセッションスピーカやったり中国茶淹れてにわか茶...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/6639 2012/06/29 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング