CodeZine(コードジン)

特集ページ一覧

C++11:スレッド・ライブラリひとめぐり【補足編:3】

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2018/03/05 14:00
目次

condition_variable

 std::condition_variableに定義された主要メンバ関数は4つあり、そのうち2つは待つ側用、

  • void wait(unique_lock<mutex>& guard);
    guard.unlock()と同時に自分自身をブロックして待ち状態となり、ブロックが解けると同時にguard.lock()して戻ってきます。ブロック中はguardを解いてくれるのね。
  • template<class Predicate> void wait(unique_lock<mutex>& guard, Predicate pred);
    predは引数がなくてboolを返す関数オブジェクトで、待ちが解ける条件です。こいつのナカミは while ( !pred() ) { wait(guard); } つまり条件を満たさぬ間、ひたすらwait(guard) します。

 wait()によるブロックを解く通知側にも2つ、

  • void notify_one();
    wait()で待ってるスレッドのうち、どれか1つのブロックが解けます。どのスレッドが動けるようになるかは運次第。
  • void notify_all();
    wait()で待ってる全スレッドのブロックが解けます。

 condition_variableを使った"正しく動く"concurrent_boxはこんな実装になります。

list05
template<typename T>
class concurrent_box {
private:
  box<T> box_;    // ハコのナカミ
  std::mutex mtx_;
  std::condition_variable can_get_; // 条件変数:getできる
  std::condition_variable can_set_; // 条件変数:setできる
public:
  concurrent_box() : box_() {}
  concurrent_box(const T& value) : box_(value) {}

  void set(const T& value) {
    std::unique_lock<std::mutex> guard(mtx_); 
    // 'setできるよ' を待つ
    can_set_.wait(guard, [this]() { return box_.empty(); });
    box_.set(value);
    // 'getできるよ' を通知
    can_get_.notify_one();
  }

  T get() { 
    std::unique_lock<std::mutex> guard(mtx_); 
    // 'getできるよ' を待つ
    can_get_.wait(guard, [this]() { return box_.occupied(); });
    T value = box_.get();
    // 'setできるよ' を通知
    can_set_.notify_one();
    return value;
  }
};

 condition_variableを2つ使って、

  • get():データが入ってくるのを待つ/空になったことを通知する
  • set():空になるのを待つ/データが入ったことを通知する

ことでproducerとconsumerの同期を実現しています。"ハンドシェイク"と呼ばれる同期ですな。

fig04
fig04

 2つのproducerと2つのconsumerが1つのconcurrent_boxを介してデータを受け渡しています、電話回線2本で調理場に2人いるラーメン屋の様子です。注文を取りこぼすことなく処理できてます。

 調子こいてアレンジしてみましょう。concurrent_boxは受け渡せるデータが1個だけなのでconsumerがモタつく(getが遅れる)とproducerがそれに引きずられてモタつきます。ハコを改め待ち行列にすればconsumerが多少モタついてもproducerはデータを送出できます。待ち行列:concurrent_queueの実装はこんなカンジ。

list06
template<typename T>
class concurrent_queue {
public:
  typedef typename std::queue<T>::size_type size_type;
private:
  std::queue<T> queue_;
  size_type capacity_; // 待ち行列の最大長

  std::mutex mtx_;
  std::condition_variable can_pop_;
  std::condition_variable can_push_;
public:
  concurrent_queue(size_type capacity) : capacity_(capacity) {
    if ( capacity_ == 0 ) {
      throw std::invalid_argument("capacity cannot be zero.");
    }
  }

  void push(const T& value) {
    std::unique_lock<std::mutex> guard(mtx_); 
    can_push_.wait(guard, [this]() { return queue_.size() < capacity_; });
    queue_.push(value);
    can_pop_.notify_one();
  }

  T pop() { 
    std::unique_lock<std::mutex> guard(mtx_); 
    can_pop_.wait(guard, [this]() { return !queue_.empty(); });
    T value = queue_.front();
    queue_.pop();
    can_push_.notify_one();
    return value;
  }
};

 複数のwait()を解くnotify_all()を使ったサンプルは元記事で紹介済なので割愛御免。

あとがきにかえてオマケ:POSIX Threads for Windows

 thread suppport libraryの下調べの最中、LinuxではおなじみのpthreadをWindowsにportしたライブラリ「pthreads4w(POSIX Threads for Windows)」を見つけました。

 スレッド周りはLinuxとWindowsとで大きな違いがあるためにLinuxからWindowsへのコードの移植は少なからず面倒で、かくのごとくデリケートなスレッドでバグると目も当てられない悲惨な状況に追い込まれます。コードをいじらずほとんどそのままコンパイルできるpthreads4wはかなり重宝しています。pthreadでconcurrent_boxを作ってみました。サンプルファイルに入ってますよ。



  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:C++11:スレッド・ライブラリひとめぐり

著者プロフィール

  • επιστημη(エピステーメー)

    C++に首まで浸かったプログラマ。 Microsoft MVP, Visual C++ (2004.01~2018.06) "だった"り わんくま同盟でたまにセッションスピーカやったり 中国茶淹れてにわか茶人を気取ってたり、 あと Facebook とか。 著書: - STL標準...

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5