condition_variable
std::condition_variableに定義された主要メンバ関数は4つあり、そのうち2つは待つ側用、
-
void wait(unique_lock<mutex>& guard);
guard.unlock()と同時に自分自身をブロックして待ち状態となり、ブロックが解けると同時にguard.lock()して戻ってきます。ブロック中はguardを解いてくれるのね。 -
template<class Predicate> void wait(unique_lock<mutex>& guard, Predicate pred);
predは引数がなくてboolを返す関数オブジェクトで、待ちが解ける条件です。こいつのナカミは while ( !pred() ) { wait(guard); } つまり条件を満たさぬ間、ひたすらwait(guard) します。
wait()によるブロックを解く通知側にも2つ、
-
void notify_one();
wait()で待ってるスレッドのうち、どれか1つのブロックが解けます。どのスレッドが動けるようになるかは運次第。 -
void notify_all();
wait()で待ってる全スレッドのブロックが解けます。
condition_variableを使った"正しく動く"concurrent_boxはこんな実装になります。
template<typename T> class concurrent_box { private: box<T> box_; // ハコのナカミ std::mutex mtx_; std::condition_variable can_get_; // 条件変数:getできる std::condition_variable can_set_; // 条件変数:setできる public: concurrent_box() : box_() {} concurrent_box(const T& value) : box_(value) {} void set(const T& value) { std::unique_lock<std::mutex> guard(mtx_); // 'setできるよ' を待つ can_set_.wait(guard, [this]() { return box_.empty(); }); box_.set(value); // 'getできるよ' を通知 can_get_.notify_one(); } T get() { std::unique_lock<std::mutex> guard(mtx_); // 'getできるよ' を待つ can_get_.wait(guard, [this]() { return box_.occupied(); }); T value = box_.get(); // 'setできるよ' を通知 can_set_.notify_one(); return value; } };
condition_variableを2つ使って、
- get():データが入ってくるのを待つ/空になったことを通知する
- set():空になるのを待つ/データが入ったことを通知する
ことでproducerとconsumerの同期を実現しています。"ハンドシェイク"と呼ばれる同期ですな。
2つのproducerと2つのconsumerが1つのconcurrent_boxを介してデータを受け渡しています、電話回線2本で調理場に2人いるラーメン屋の様子です。注文を取りこぼすことなく処理できてます。
調子こいてアレンジしてみましょう。concurrent_boxは受け渡せるデータが1個だけなのでconsumerがモタつく(getが遅れる)とproducerがそれに引きずられてモタつきます。ハコを改め待ち行列にすればconsumerが多少モタついてもproducerはデータを送出できます。待ち行列:concurrent_queueの実装はこんなカンジ。
template<typename T> class concurrent_queue { public: typedef typename std::queue<T>::size_type size_type; private: std::queue<T> queue_; size_type capacity_; // 待ち行列の最大長 std::mutex mtx_; std::condition_variable can_pop_; std::condition_variable can_push_; public: concurrent_queue(size_type capacity) : capacity_(capacity) { if ( capacity_ == 0 ) { throw std::invalid_argument("capacity cannot be zero."); } } void push(const T& value) { std::unique_lock<std::mutex> guard(mtx_); can_push_.wait(guard, [this]() { return queue_.size() < capacity_; }); queue_.push(value); can_pop_.notify_one(); } T pop() { std::unique_lock<std::mutex> guard(mtx_); can_pop_.wait(guard, [this]() { return !queue_.empty(); }); T value = queue_.front(); queue_.pop(); can_push_.notify_one(); return value; } };
複数のwait()を解くnotify_all()を使ったサンプルは元記事で紹介済なので割愛御免。
あとがきにかえてオマケ:POSIX Threads for Windows
thread suppport libraryの下調べの最中、LinuxではおなじみのpthreadをWindowsにportしたライブラリ「pthreads4w(POSIX Threads for Windows)」を見つけました。
スレッド周りはLinuxとWindowsとで大きな違いがあるためにLinuxからWindowsへのコードの移植は少なからず面倒で、かくのごとくデリケートなスレッドでバグると目も当てられない悲惨な状況に追い込まれます。コードをいじらずほとんどそのままコンパイルできるpthreads4wはかなり重宝しています。pthreadでconcurrent_boxを作ってみました。サンプルファイルに入ってますよ。