Shoeisha Technology Media

CodeZine(コードジン)

記事種別から探す

スレッド間のデータ共有に使える「Microsoft PPLの並列コンテナ」

同時実行安全なvectorとqueue

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2017/02/02 14:00

 C++11で、<thread>、<future>、<mutex>などで定義されたスレッド・ライブラリが標準化されました。おかげで従来OSごとに異なるAPI(Linuxならpthread、WindowsならWinAPI)で実装してたスレッド周りが1つのコードで済むようになりました。しかしながら標準化されたのはスレッド絡みの基本的なものに限られていて、アプリケーション実装にはまだ足りないものが少なくありません。その一つがスレッド間のデータの共有(受け渡し)に使えるコンテナ(データの集合)です。そこでMicrosoft PPLが提供する並列コンテナを軽く紹介します。

目次

スレッド間のデータのやり取りにstd::vector<t>は向かない

 5年近く前のアーティクル:『C++11:スレッド・ライブラリひとめぐり』で、10万未満の素数の総数を求めるサンプルを示しました。これをベースに、素数の個数を求めるだけでなく、見つけた素数:int pを引数に関数オブジェクトf(int)を呼ぶよう少しばかり手を加えます。関数オブジェクト内でstd::vector<int<>に見つけた素数pを追加(push_back(p))します。シングルスレッドだとこんなコードになりますか。

list-01 prime.h
#ifndef PRIME_H_
#define PRIME_H_

#include <mutex>
#include <functional>
#include <cmath>

// nは素数?
inline bool is_prime(int n) {
  int sqn = (int)sqrt((float)n) + 1;
  if ( n == 2 || n == 3 ) return true; // 2,3は素数
  if ( n % 2 == 0 ) return false; // (2を除く)偶数は素数じゃない
  // 3以上の奇数で割れたら素数じゃない
  for ( int i = 3; i < sqn; i += 2 ) {
    if ( n % i == 0 ) {
      return false;
    }
  }
  return true;
}

// lo以上hi未満の素数 x に対し f(x) し、総数を返す
template<typename Function>
int count_prime(int lo, int hi, Function f) {
  int count = 0;
  for ( int i = lo; i < hi; ++i ) {
    if ( is_prime(i) ) {
      f(i);
      ++count;
    }
  }
  return count;
}

#endif
list-02 single.cpp
/* single-thread で素数を見つける */
#include <iostream>
#include <chrono>

#include "prime.h"

template<typename C> 
int single(int M, C& c) {
  using namespace std;
  chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now();
  int count = count_prime(2, M, [&](int p) { c.push_back(p); } );
  chrono::high_resolution_clock::time_point stop  = chrono::high_resolution_clock::now();
  chrono::milliseconds duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
  cout << duration.count() << " [ms]" << endl;
  return count;
}

#include <vector>

int main() {
  using namespace std;
  const int M = 500000;
  vector<int> primes;
  int count = single(M, primes);
  if ( count == primes.size() ) {
    cout << count << " primes found." << endl;
  } else {
    cout << "oops, " << count << " != " << primes.size() << endl;
  }
}

 2以上M未満の素数がコレで求まります。範囲"2以上M未満"を等分に分割し、各範囲にある素数をそれぞれ独立したスレッドに見つけてもらいます。

list-03 div_range.h
#ifndef DIV_RANGE_H_
#define DIV_RANGE_H_

template<typename T =int>
class div_range {
private:
  T lo_;
  T hi_;
  T stride_;
  int n_;
public:
  div_range(T lo, T hi, int n) : lo_(lo), hi_(hi), n_(n) { stride_ = (hi - lo)/n; }
  T lo(int t) const { return lo_ + stride_ * t; }
  T hi(int t) const { return (++t < n_) ? lo_ + stride_ * t : hi_; }
};

#endif
list-04 multi.cpp
#include <iostream>
#include <chrono>
#include <thread>
#include <future>
#include <vector>

#include "prime.h"
#include "div_range.h"

template<typename Function> 
int multi(int M, Function f, int nthr) {
  using namespace std;
  vector<future<int>> futures;
  // 2以上M未満の範囲をnthr等分し、それぞれを個別のthreadに分担させる
  div_range<int> rng(2, M, nthr);
  chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now();
  for ( int i = 0; i < nthr; ++i ) {
    // async()でthreadを起こす
    futures.emplace_back(async(count_prime<Function>, rng.lo(i), rng.hi(i), f));
  }
  // 各threadから得られた結果(素数の個数)を積算する
  int count = 0;
  for ( auto& fut : futures ) {
    count += fut.get();
  }
  chrono::high_resolution_clock::time_point stop  = chrono::high_resolution_clock::now();
  chrono::milliseconds duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
  cout << nthr << " thread(s) found " << count << " primes in " << duration.count() << " [ms]" << endl;
  return count;
}

int main() {
  using namespace std;
  const int M = 500000;

  vector<int> primes;

  for ( int nthr = 1; nthr <= 10; ++nthr ) {
    primes.clear();
    int count = multi(M, [&](int p) { primes.push_back(p); }, nthr);
    if ( count != primes.size() ) {
      cout << "oops, " << count << " != " << primes.size() << endl;
    }
  }
}

 実行したらば、こんな残念な結果となりました。

fig-1
fig-1

 正しく動いているならmulti()が返す素数の総数と vector内の要素数:primes.size()は一致するはずですが、マルチスレッドだと後者が少な目になっていますね。

 複数のスレッドが同時にpush_back()することでデータ競合(data race)が発生したのでしょう。最後まで動いただけでもラッキーです、push_back()の際にはvector内部でメモリの確保/解放およびポインタの付け替えが行われるため、メモリのアクセスエラーで異常停止しても不思議ではありませんからね。

 複数のスレッドがvectorに対し勝手気ままに要素を投げ込むのがトラブルの原因なのですから、1つのスレッドがpush_back()している間、他のスレッドがpush_back()できないようブロックしておけば正しい結果が得られるでしょう、関数オブジェクト内でmutexを使って排他制御してみましょうか。

list-05 multi.cpp(改)
...
#include <mutex>

int main() {
  using namespace std;
  const int M = 500000;
  vector<int> primes;
  mutex mtx;

  for ( int nthr = 1; nthr <= 10; ++nthr ) {
    primes.clear();
    int count = multi(M, [&](int p) { mtx.lock(); primes.push_back(p); mtx.unlock(); }, nthr);
    if ( count != primes.size() ) {
      cout << "oops, " << count << " != " << primes.size() << endl;
    }
  }
}
fig-2
fig-2

 1つのスレッドがvectorに要素を挿入(push_back)している間は他のスレッドのpush_backがmutexでブロックされて動けないためパフォーマンスはほんの少し低下しますけど、要素挿入の頻度がさほどに大きくなければ十分使えるでしょうね。

 しかしながら実際のアプリケーションでは条件がよりシビアになり得ます。スレッド間でコンテナが共有/共用されるシチュエーションでは、コンテナに要素を書いてるより、むしろ要素を読んでるスレッドの方が多くなることが考えられます。コンテナ内の要素を読んで一連の処理を行うということは、コンテナが提供するイテレータを介して要素にアクセスすることを意味します。

list-06
  for ( auto iter = c.begin(); iter != c.end(); ++iter ) {
    // *iterを読んでなんかする
  }

みたいにね。この for-loop が回っている間イテレータは常に有効でなくてはならず、イテレータの指す先が変化しては困ります。従ってコンテナ内の要素は他のスレッドが勝手に削除/移動してはなりません。

 この条件を満たすのに最も安直な解決策は、イテレータが使われている期間、上の例ではfor-loopが回っている間もmutexを用いることで他のスレッドからの割り込みをブロックすることなのですが、そんなことしちゃうとスレッドのどれか1つがコンテナを操作している間は他の全スレッドがコンテナ操作をブロックされ、シングルスレッドで動いてるのと大差なくなっちゃいます。mutexなどで排他制御する期間を極力小さく抑えないとマルチスレッドの旨味がなくなってしまいます。

 加えて困ったことに、std::vectorは要素の削除時だけじゃなく挿入時にもイテレータが無効となることがあります。std::vector<t> vがあって、std::vector<t>::iter = v.begin()によって先頭要素を指すイテレータを手に入れたのち、v.insert()/push_back()/emplace_back() を行うとその瞬間iterが無効となり得るんですわ。以下コードでその様子を観察しましょう。

list-07 contiguous.cpp
#include <iostream>
#include <vector>

using namespace std;

/* コンテナの先頭位置、および各要素の先頭からの距離(バイト数) */
template<typename C>
void print_layout(C& c) {
  for (int i = 0; i < 20; ++i) {
    c.push_back(i);
    char* frontptr = (char*)&c[0];
    cout << (void*)frontptr << " : ";
    for (int j = 0; j < c.size(); ++j) {
      cout << (int)((char*)&c[j] - frontptr) << " ";
    }
    cout << endl;
  }
}

int main() {

  std::vector<int> v;
  cout << "std::vector<int>\n";
  print_layout(v);

  concurrency::concurrent_vector<int> cv;
  cout << "\nconcurrency::concurrent_vector<int>\n";
  print_layout(cv);
}
fig-3
fig-3

 ほらね、push_back()すると各要素のアドレスが丸ごと書き換わることがあるんです。当然っちゃ当然の挙動でして、std::vector<t>内にはメモリ領域が確保されていて、その領域に要素列が格納されています。要素の挿入によって確保されているメモリ領域が溢れそうになると、より大きな(新たな)領域を確保し/現領域から新領域に要素列をコピーして/現領域を解放します。このときイテレータの指す先が解放されて無効となります。

 そんなわけで、スレッド間でデータをやり取りする手段としてstd::vector<t>は(排他すべき範囲があまりに広いせいで)向いてない、ということに。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

著者プロフィール

  • επιστημη(エピステーメー)

    C++に首まで浸かったプログラマ。 Microsoft MVP, Visual C++ (2004.01~) だったり わんくま同盟でたまにセッションスピーカやったり 中国茶淹れてにわか茶人を気取ってたり、 あと Facebook とか。 著書: - STL標準講座 (監修) -...

All contents copyright © 2006-2017 Shoeisha Co., Ltd. All rights reserved. ver.1.5