並列キュー:concurrent_queue
長時間保持せず作成したらすぐ使う時、先入れ先出し法(FIFO:First In、First Out)の待ち行列もあると便利です。TBBにはconcurrent_queue
とconcurrent_bounded_queue
の2つのスレッドセーフな並列キューが用意されています。
並列キューを理解するには、普通のキューを理解する必要があるため、まずはSTLのqueueコンテナアダプタを使った簡単なサンプルプロジェクトQueue
を使ってキューの解説をします。
#include <iostream> #include <queue> #include <list> using namespace std; int main(void) { //dequeを変形してキューを作成&操作 queue<int> q; for ( int i = 0; i < 10; ++i ) { q.push( i ); } cout << "dequeを変形したキューの要素:"; for ( int i = 0; i < 10; ++i ) { int tmp = q.front(); q.pop(); cout << tmp << " "; } cout << endl; //listを変形してキューを作成&操作 queue< int, list<int> > lq; for ( int i = 0; i < 10; ++i ) { lq.push( i ); } cout << "Listを変形したキューの要素:"; for ( int i = 0; i < 10; ++i ) { int tmp = lq.front(); lq.pop(); cout << tmp << " "; } cout << endl; //終了 cout << endl << endl; return 0; }
このサンプルは、1~10の数値をキューに格納して、それを1つずつ取り出して表示しています(下記の図を参照)。
キューはその名のとおり、格納された順に要素を取り出すデータ構造です。C++では高度なqueueコンテナアダプタが用意されており、list・dequeなどのコンテナを変形できます。しかし、現時点のTBBでは変形はできないので、コンテナアダプタではなくコンテナとして考えてください。
以上の説明でキューについての基礎知識が得られたと思うので、TBBの並列キューconcurrent_queue
をサンプルプロジェクトConcurrent_Queue
を交えて解説します。
#include <iostream> #include <windows.h> #include <winnt.h> #include <process.h> #include "tbb/concurrent_queue.h" #include "tbb/parallel_for.h" #include "tbb/blocked_range.h" #include "tbb/task_scheduler_init.h" using namespace std; using namespace tbb; /*------------------------------------------------------------------------------- 何らかの処理に基づき要素を決定してからプッシュを行うクラス ----------------------------------------------------------------------------------*/ class ParallelPush { int count; concurrent_queue<int>* queue; public: ParallelPush( int count, concurrent_queue<int>* queue ) : count( count ), queue( queue ) { }; /* 並列的にプッシュする */ void operator() ( const blocked_range<size_t>& range ) const { concurrent_queue<int>* tmp = this->queue; for ( int i = range.begin(); i != range.end(); ++i ) { int value = i; Sleep( 1 ); //何らかの処理をしていると仮定 tmp->push( value ); } }; }; /*------------------------------------------------------------------------------- キューをポップしてから表示を行うクラス ----------------------------------------------------------------------------------*/ class ParallelPop { private: int count; concurrent_queue<int>* queue; public: ParallelPop( int count, concurrent_queue<int>* queue ) : count( count ), queue( queue ) { }; /* 並列的にポップする */ void operator() ( const blocked_range<size_t>& range ) const { concurrent_queue<int>* tmp = this->queue; for ( int i = range.begin(); i != range.end(); ++i ) { int value = -1; //要素がプッシュされるまでポップを試みる while( value == -1 ) { tmp->try_pop( value ); } cout << value << " "; } cout << endl; }; }; /*------------------------------------------------------------------------------- メインプログラム ----------------------------------------------------------------------------------*/ int main(void) { //プッシュ&ポップ concurrent_queue<int> q; cout << "これからキューに10個の要素をプッシュしてからポップします・・・" << endl; for ( int i = 0; i < 10; ++i ) { q.push( i ); } cout << "キューの要素:"; for ( int i = 0; i < 10; ++i ) { int tmp = -1; q.try_pop( tmp ); cout << tmp << " "; } cout << endl << endl; //並列的にプッシュ&ポップを行う int count = 100; cout << "これからキューに" << count << "個の要素をプッシュしてからポップします・・・" << endl; cout << "キューの要素:"; concurrent_queue<int>* pq = new concurrent_queue<int>(); task_scheduler_init init; ParallelPush push( count, pq ); ParallelPop pop( count, pq ); parallel_for( blocked_range<size_t>( 0, count, 10000 ), push ); parallel_for( blocked_range<size_t>( 0, count, 10000 ), pop ); //終了 cout << endl << endl; return 0; }
このサンプルプロジェクトでは、先ほどの例と同じく逐次的にプッシュ&ポップを行う部分と、並列的にポップ&プッシュを行う部分があります。前のサンプルと併せてみると、並列キューとSTLのコンテナアダプタとの違いが出ています。その違いを元に並列キューの特徴を解説します。
1つ目に、ポップを行うメソッドがtry_pop
になっており、1つのメソッドで要素の取得が行われている点です。STLでは、front
メソッドで先頭の要素を取り出し、pop
メソッドで先頭要素を削除しています。しかし、concurrent_queue
コンテナは1つのメソッドで一連の動作を行います。この理由は、並列処理時にはアトミックな操作を行わなくてはならないからです。アトミックな操作とは、いくつかの操作を1つの操作に見える処理することです。分かりにくいと思うので、例を挙げます。
例えば、++iは一見単純な1つの命令ですが、実際は現在の値を取り出す・現在の値に1を加える・新しい値を返すの3つの働きをするものなので、並列処理時には思わぬ結果をもたらすことがあります。今2つのスレッドA・Bが同時に共有している変数iに、前置インクリメントを実行したとします。この時、スレッドAが先に実行して現在の値に1を加えている瞬間に、スレッドBが値を取り出すと変数iの値が1を加える前の状態になってしまっています。この状況になってしまうと、変数iの値はプログラマーの要想した結果とは異なってしまいます。このような並列処理で生じる不都合を防ぐために、TBBではアトミックな操作になっているのです。
2つ目にポップ方法が違います。逐次処理では必ずプッシュが成功しますが、並列処理時にはポップとプッシュの実行順序が保証されないので、プッシュされる前にポップ命令が実行される場合があります。そこで、concurrent_queue
のtry_pop
メソッドは、ポップが成功しない時は変数の値を変更しないで返すことになっています。そのため、このサンプルでは値の取得が成功するまでループを回しています。
3つ目は、サイズが符号付きであることです。STLのキューとは違い、並列キューのサイズはマイナス値になることがありえます。これは、並列処理時にはプッシュとポップの実行順序が保証されないからです。そのため、TBBの並列キューのサイズはプッシュ回数からポップ回数を引いたものだと定義されています。また、このサンプルでは行っていませんが、複数のスレッドからプッシュを行うと、キュー内の要素は必ずしもプログラマーの予想どおりにはなりません。この理由もやはり実行順序が保証されないからです。並列処理プログラミングで並列キューを使う時は、この点に注意してください。
最後にcout
の取り扱いについての注意するべき点を書きます。このサンプルでは単純化するためにロック処理をしていませんが、cout
はスレッドセーフではないので、並列処理時にcout
に出力を指定するとコンソール画面の表示が滅茶苦茶になります。試しにparallel_for( blocked_range( 0, count, 10000 ), pop );
の部分をparallel_for( blocked_range( 0, count, 1 ), pop );
に変えてみてください。何度も実行すると表示がおかしくなるのが確認できます。もしうまく行かない場合は、さらにcout << value << " ";
の部分を変更して、長い文字列を出力するようにしてください。
以上で並列キューの説明は終わりです。次項ではサイズ制限付き並列キューconcurrent_bounded_queue
について解説します。