SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

インテルTBBを通じて学ぶ並列処理

スレッドセーフとインテルTBBのコンテナ

インテルTBBを通じて学ぶ並列処理(3)


  • X ポスト
  • このエントリーをはてなブックマークに追加

サイズ制限付き並列キュー:concurrent_bounded_queue

 TBBには限界値を設定できる並列キューconcurrent_bounded_queueが用意されています。なぜこのような機能を持つ並列キューが特別に用意されているのかと言うと、並列処理を長時間実行しているとバッファがオーバーする危険性があるからです。例えば、前項の例でポップ処理をするよりもプッシュ処理をする方が速いとします。この場合、キューの要素が増え続け、いつかメモリを食いつくしてしまうでしょう。このような状況を防ぎたい時、concurrent_bounded_queueが役に立ちます。まずは、サイズ制限付き並列キューの基本的な使い方を示したサンプルプロジェクトConcurrent_Bounded_Queueを見てください。

Concurrent_Bounded_Queue:サイズ制限付き並列キューの基本的な使い方
#include <iostream>
#include <windows.h>
#include <winnt.h>
#include <process.h>
#include "tbb/concurrent_queue.h"
#include "tbb/parallel_for.h"
#include "tbb/blocked_range.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tick_count.h"
#include "tbb/tbb_thread.h"
using namespace std;
using namespace tbb;

/*-------------------------------------------------------------------------------

    何らかの処理に基づき要素を決定してからプッシュを行うクラス

----------------------------------------------------------------------------------*/
class ParallelPush {
    int count;
    concurrent_bounded_queue<int>*  queue;
public:
    ParallelPush( int count, concurrent_bounded_queue<int>* queue ) :
      count( count ), queue( queue ) { };

    /*
        並列的にプッシュする
    */
    void operator()() const
    {
        concurrent_bounded_queue<int>* tmp = this->queue;
        for ( int i = 0; i < this->count; ++i ) {
            Sleep( 1 ); //何らかの処理をしていると仮定
            int value = i;
            bool flag = false;
            while ( !flag ) {
                flag = tmp->try_push( value );
                if ( !flag ) {
                    this_tbb_thread::yield();
                }
            }
        }
    };
};

/*-------------------------------------------------------------------------------

    キューをポップしてから表示を行うクラス

----------------------------------------------------------------------------------*/
class ParallelPop {
private:
    int count;
    concurrent_bounded_queue<int>*  queue;
public:
    ParallelPop( int count, concurrent_bounded_queue<int>* queue ) :
      count( count ), queue( queue ) { };

    /*
        並列的にポップする
    */
    void operator()() const
    {
        concurrent_bounded_queue<int>* tmp = this->queue;
        for ( int i = 0; i < this->count; ++i ) {
            int value = -1;
            while( value == -1 ) {
                tmp->try_pop( value );
                if ( value == -1 ) {
                    this_tbb_thread::yield();
                }
            }
            cout << value << " ";       
        }
        cout << endl;
    };
};


/*-------------------------------------------------------------------------------

    メインプログラム

----------------------------------------------------------------------------------*/
int main(void)
{

    //プッシュ&ポップ
    concurrent_bounded_queue<int> q;
    cout << "これからキューに10個の要素をプッシュしてからポップします・・・" << endl;
    for ( int i = 0; i < 10; ++i ) {
        q.push( i );
    }
    cout << "キューの要素:";
    for ( int i = 0; i < 10; ++i ) {
        int tmp = -1;
        q.pop( tmp );
        cout << tmp << " ";
    }
    cout << endl << endl;

    //並列的にプッシュ&ポップを行う
    int count = 100;
    cout << "これからキューに" << count << "個の要素をプッシュしてからポップします・・・" << endl;
    cout << "キューの要素:";
    concurrent_bounded_queue<int>* pq = new concurrent_bounded_queue<int>();
    pq->set_capacity( 1 );
    task_scheduler_init init;
    ParallelPush push( count, pq );
    ParallelPop pop( count, pq );
    tbb_thread pushThread( push );
    tbb_thread popThread( pop );
    pushThread.join();
    popThread.join();
   
    //終了
    cout << endl << endl;
    return 0;
}

 このサンプルが行っていることは基本的にConcurrent_Queueと同じです。違いはバッファがあふれないように工夫している点です。

 最初にpushメソッドとpopメソッドがある点に注目してください。concurrent_bounded_queueコンテナには、ブロックを行うpushメソッドとpopメソッド、およびブロックを行わないtry_pushメソッドとtry_popメソッドがあります。ブロックをするとは、処理を排他的に行いプッシュ/ポップ処理を行っている間、他の処理からの干渉を防ぐことを意味しています。この2種類のメソッドの使い分けが重要です。

 concurrent_bounded_queueコンテナは、set_capacityメソッドを使用して上限を決定できます。この後、限界値までプッシュをするとpushでは処理がブロックされ、キューの要素が減って無事プッシュ作業が終わるまでメソッドが返ってきません。一方、try_pushメソッドを使用すると、処理が成功した場合はtrue、失敗した場合はfalseの結果がすぐに返ってきます。

 今回のサンプルでは、処理がブロックされると困るので、try_pushメソッドを使用して、失敗した時にポップ処理ができるように他のTBBスレッドに処理をする権利を譲っています。こうすることによりキューの要素が減り、またプッシュ処理ができるようになります。

 TBBのスレッド処理について気になる方がいると思いますが、スレッドについての解説はこれ以降の連載で行います。次項では並列ハッシュマップについて解説します。

次のページ
並列ハッシュマップ:concurrent_hash_map

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
インテルTBBを通じて学ぶ並列処理連載記事一覧

もっと読む

この記事の著者

インドリ(インドリ)

分析・設計・実装なんでもありのフリーエンジニア。ブログ「無差別に技術をついばむ鳥(http://indori.blog32.fc2.com/)」の作者です。アドバイザーをしたり、システム開発したり、情報処理技術を研究したりと色々しています。座右の銘は温故知新で、新旧関係なく必要だと考えたものは全て学...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/4861 2010/04/27 12:09

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング