サイズ制限付き並列キュー:concurrent_bounded_queue
TBBには限界値を設定できる並列キューconcurrent_bounded_queue
が用意されています。なぜこのような機能を持つ並列キューが特別に用意されているのかと言うと、並列処理を長時間実行しているとバッファがオーバーする危険性があるからです。例えば、前項の例でポップ処理をするよりもプッシュ処理をする方が速いとします。この場合、キューの要素が増え続け、いつかメモリを食いつくしてしまうでしょう。このような状況を防ぎたい時、concurrent_bounded_queue
が役に立ちます。まずは、サイズ制限付き並列キューの基本的な使い方を示したサンプルプロジェクトConcurrent_Bounded_Queue
を見てください。
#include <iostream> #include <windows.h> #include <winnt.h> #include <process.h> #include "tbb/concurrent_queue.h" #include "tbb/parallel_for.h" #include "tbb/blocked_range.h" #include "tbb/task_scheduler_init.h" #include "tbb/tick_count.h" #include "tbb/tbb_thread.h" using namespace std; using namespace tbb; /*------------------------------------------------------------------------------- 何らかの処理に基づき要素を決定してからプッシュを行うクラス ----------------------------------------------------------------------------------*/ class ParallelPush { int count; concurrent_bounded_queue<int>* queue; public: ParallelPush( int count, concurrent_bounded_queue<int>* queue ) : count( count ), queue( queue ) { }; /* 並列的にプッシュする */ void operator()() const { concurrent_bounded_queue<int>* tmp = this->queue; for ( int i = 0; i < this->count; ++i ) { Sleep( 1 ); //何らかの処理をしていると仮定 int value = i; bool flag = false; while ( !flag ) { flag = tmp->try_push( value ); if ( !flag ) { this_tbb_thread::yield(); } } } }; }; /*------------------------------------------------------------------------------- キューをポップしてから表示を行うクラス ----------------------------------------------------------------------------------*/ class ParallelPop { private: int count; concurrent_bounded_queue<int>* queue; public: ParallelPop( int count, concurrent_bounded_queue<int>* queue ) : count( count ), queue( queue ) { }; /* 並列的にポップする */ void operator()() const { concurrent_bounded_queue<int>* tmp = this->queue; for ( int i = 0; i < this->count; ++i ) { int value = -1; while( value == -1 ) { tmp->try_pop( value ); if ( value == -1 ) { this_tbb_thread::yield(); } } cout << value << " "; } cout << endl; }; }; /*------------------------------------------------------------------------------- メインプログラム ----------------------------------------------------------------------------------*/ int main(void) { //プッシュ&ポップ concurrent_bounded_queue<int> q; cout << "これからキューに10個の要素をプッシュしてからポップします・・・" << endl; for ( int i = 0; i < 10; ++i ) { q.push( i ); } cout << "キューの要素:"; for ( int i = 0; i < 10; ++i ) { int tmp = -1; q.pop( tmp ); cout << tmp << " "; } cout << endl << endl; //並列的にプッシュ&ポップを行う int count = 100; cout << "これからキューに" << count << "個の要素をプッシュしてからポップします・・・" << endl; cout << "キューの要素:"; concurrent_bounded_queue<int>* pq = new concurrent_bounded_queue<int>(); pq->set_capacity( 1 ); task_scheduler_init init; ParallelPush push( count, pq ); ParallelPop pop( count, pq ); tbb_thread pushThread( push ); tbb_thread popThread( pop ); pushThread.join(); popThread.join(); //終了 cout << endl << endl; return 0; }
このサンプルが行っていることは基本的にConcurrent_Queue
と同じです。違いはバッファがあふれないように工夫している点です。
最初にpush
メソッドとpop
メソッドがある点に注目してください。concurrent_bounded_queue
コンテナには、ブロックを行うpush
メソッドとpop
メソッド、およびブロックを行わないtry_push
メソッドとtry_pop
メソッドがあります。ブロックをするとは、処理を排他的に行いプッシュ/ポップ処理を行っている間、他の処理からの干渉を防ぐことを意味しています。この2種類のメソッドの使い分けが重要です。
concurrent_bounded_queue
コンテナは、set_capacity
メソッドを使用して上限を決定できます。この後、限界値までプッシュをするとpush
では処理がブロックされ、キューの要素が減って無事プッシュ作業が終わるまでメソッドが返ってきません。一方、try_push
メソッドを使用すると、処理が成功した場合はtrue、失敗した場合はfalseの結果がすぐに返ってきます。
今回のサンプルでは、処理がブロックされると困るので、try_push
メソッドを使用して、失敗した時にポップ処理ができるように他のTBBスレッドに処理をする権利を譲っています。こうすることによりキューの要素が減り、またプッシュ処理ができるようになります。
TBBのスレッド処理について気になる方がいると思いますが、スレッドについての解説はこれ以降の連載で行います。次項では並列ハッシュマップについて解説します。