12. バリア
複数の処理を平行で行う場合、それをスレッドで実装すると、どのスレッドが早く終わるかはプログラムの処理に依存します。あるスレッドはDBを読み、あるスレッドは他サーバからの応答を待っているかもしれません。しかしどれか一つのスレッドが特定の処理を開始する時に、他のスレッドも特定処理を開始できる状況まで待ち、全員が揃って初めて処理を行いたい場合があります。
バリアは、ある特定の処理を行う時に全スレッドがその位置まで来ていることを保障するための機能です。
全スレッドで協調した処理を行いたい場合、大変便利で使える機能だと思います。
12.1 リファレンスとサンプル
主なbarrier
関数は次のとおりで、詳細は適時manなどで参照してください。
- int pthread_barrier_init ( pthread_barrier_t * barrier, const pthread_barrierattr_t * attr, unsigned int count );
- pthread_barrier_init:バリア用変数をスレッド属性に組み込みます。その際にバリア対象のスレッド数も渡します。
- int pthread_barrier_destroy ( pthread_barrier_t * barrier );
- pthread_barrier_destroy:バリア用変数を破棄します。
- int pthread_barrier_wait ( pthread_barrier_t * barrier );
- pthread_barrier_wait:バリア対象のスレッド数が到着するまでバリアを実行します。
サンプルは下記の通りです。カウンタの値が1,000,000で割り切れる時にバリアを行い、全スレッドが到達したら1秒sleepした後にメッセージを出力して再開しています。
/* gcc barrier_test.c -o barrier_test -W -Wall -g -lpthread */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <sys/time.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_barrier_t barrier; #define THREAD_MAX 8 #define LOOP_CNT 10000000 #define TV2SEC(tv) ((double)((tv).tv_sec) + (double)((tv).tv_usec / 1000000.0)) void * thread_func( void * arg ); int main( ) { pthread_t id[THREAD_MAX]; double * result[THREAD_MAX]; int i; pthread_barrier_init( &barrier, 0, THREAD_MAX ); pthread_mutex_lock( &mutex ); for( i = 0; i < THREAD_MAX; i ++ ) { pthread_create( &id[i], 0, thread_func, ( void * )&i ); pthread_cond_wait( &cond, &mutex ); } pthread_mutex_unlock( &mutex ); for( i = 0; i < THREAD_MAX; i ++ ) { pthread_join( id[i], ( void ** )&result[i] ); } for( i = 0; i < THREAD_MAX; i ++ ) { printf( "thread %d end ... timesec:[%f]\n", i, *result[i] ); } return 0; } void * thread_func( void * arg ) { pthread_mutex_lock( &mutex ); int index = *( int * )arg; double * rtn = malloc( sizeof( double )); struct timeval tv1,tv2; int count, i; pthread_cond_signal( &cond ); pthread_mutex_unlock( &mutex ); gettimeofday( &tv1, 0 ); for( i = 0, count = 0; i < LOOP_CNT; i ++ ) { count ++; if( count % 1000 == 0 ) { printf( "thread %d \t\t%*.*s%4d\r", index, index + 1, index + 1, "\t\t\t\t\t\t\t\t", count / 1000 ); fflush( stdout ); } if( count % 1000000 == 0 ) { printf( "thread %d Barrier!!\n", index ); pthread_barrier_wait( &barrier ); sleep( 1 ); printf( "thread %d Barrier release!!\n", index ); pthread_barrier_wait( &barrier ); } } gettimeofday( &tv2, 0 ); *rtn = TV2SEC(tv2) - TV2SEC(tv1); return ( void * )rtn; }
12.2 注意点
バリアは登録されたスレッド数が到着するまでサスペンドしています。スレッドのクリティカルセクション中(mutex_lockとmutex_unlockによって囲まれた領域)にバリアを行われると、他のスレッドはmutex_lockが行えずにサスペンドが続くことになり、デッドロックが発生します。
バリアを設定する位置に注意してください。