生産者と消費者
そんなわけで、スレッド間のデータの受け渡しを考えます。データを生産し送り出すスレッドをproducer(生産者)、データを受け取って消費するスレッドをconsumer(消費者)と名付けておきます。
producerからconsumerへデータを受け渡さにゃなりません。手始めにこんなコードではいかがでしょ。
/* producer-consumer間のデータ受け渡しのための'ハコ' */ template<typename T> class box { private: T value_; // ハコのナカミ public: box() {} box(const T& value) : value_(value) {} void set(const T& value) { value_ = value; } // 送出 const T& get() const { return value_; } // 受取 }; #include <iostream> #include <random> #include <string> void produce(box<int>& out) { using namespace std; const int N = 100000; random_device gen; uniform_int_distribution<int> dist(0,3); // 0~3のランダムな整数 auto rand = [&]() { return dist(gen); }; int sum = 0; for ( int i = 0; i < N; ++i ) { int value = rand(); out.set(value); // 送出 sum += value; } cout << to_string(N) + " items produced. sum= " + to_string(sum) + "\n"; } void consume(box<int>& in) { using namespace std; int count = 0; int sum = 0; while ( true ) { int value = in.get(); // 受取 if ( value < 0 ) break; // ここでloopを抜ける ++count; sum += value; } cout << to_string(count) + " items consumed. sum= " + to_string(sum) + "\n"; } #include <thread> #include <utility> int main() { box<int> b; std::thread producer(produce, std::ref(b)); std::thread consumer(consume, std::ref(b)); producer.join(); b.set(-1); // consumerを停止させるため consumer.join(); }
2つの関数produce()とconsume()を同時に起動すれば、やがて双方から同じ結果が出力される……わけがない。
producerはboxに送出したデータをconsumerが受け取ってくれたか確認もせぬまま、お構いなしに次のデータを送出してますからね。consumerも同罪です。送出されたか確認もせずに受け取ってます。
producerとconsumerが互いに同期、つまりタイミングを合わせて送出/受取を行わないとconsumerに届くデータは消失するわ重複するわでマトモな結果は得られません。producerは「読んでいいよ」、consumerは「書いていいよ」を相手に通知し、そしてproducerは「書いていいよ」、consumerは「読んでいいよ」の通知を待つ機構が必要です。
ならばこれではどうだろう……。
template<typename T> class box { private: T value_; // ハコのナカミ bool empty_; // 空か? public: box() : empty_(true) {} box(const T& value) : value_(value), empty_(false) {} bool empty() const { return empty_; } // 空ならtrue bool occupied() const { return !empty_; } // 空でないならtrue void set(const T& value) { empty_ = false; value_ = value; } // 送出:空じゃなくなる const T& get() { empty_ = true; return value_; } // 受取:空になる }; #include <thread> template<typename T> class concurrent_box { private: box<T> box_; // ハコのナカミ public: concurrent_box() : box_() {} concurrent_box(const T& value) : box_(value) {} void set(const T& value) { // 空じゃない間待つ while ( box_.occupied() ) { std::this_thread::yield(); } box_.set(value); } const T& get() { // 空である間待つ while ( box_.empty() ) { std::this_thread::yield(); } return box_.get(); } }; void produce(concurrent_box<int>& out) { ... out.set(データ); ... } void consume(concurrent_box<int>& in) { ... データ = in.get(); ... }
boxに空か否かを教えてくれるempty()、occupied()を用意し、boxを内包するconcurrent_boxは、set()内で空になるまでカラ回り、get()内で空じゃなくなるまでカラ回りさせてます。
カラ回りloop内のstd::this_thread::yield()を説明しておきます。スレッドには大きくWAIT/READY/RUNの3状態がありますよね。yield()はRUN状態にある現スレッド(this_thread)をいったんREADYに落とすことで他のREADYなスレッドにCPU(コア)を譲ります。条件が満たされるまで空loopをぶん回すと他のスレッドがなかなかRUN状態になれないのでいったんCPUを明け渡すんです。
感心しない実装ですねー。このloopは単に待つだけのためにアクセルべた踏みで空転させてる。電気代の無駄遣いです。本来なら条件が満たされるまでやることないんだからWAITしててほしいのですよ。
それともう一つ。このプログラム何度か動かすと、たまに結果が狂います。
concurrent_boxに対して複数のスレッドが同時にset()/get()することがあるんだから、このままじゃdata race起こすんですよ。set()/get()できるスレッドは高々1つとなるようガードせにゃならんです。concurrent_box内にstd::mutexを置いて、
template<typename T> class concurrent_box { private: box<T> box_; // ハコのナカミ std::mutex mtx_; public: concurrent_box() : box_() {} concurrent_box(const T& value) : box_(value) {} void set(const T& value) { std::unique_lock<std::mutex> guard(mtx_); // set中は邪魔するな // 空じゃない間待つ while ( box_.occupied() ) { std::this_thread::yield(); } box_.set(value); } const T& get() { std::unique_lock<std::mutex> guard(mtx_); // get中は邪魔するな // 空である間待つ while ( box_.empty() ) { std::this_thread::yield(); } return box_.get(); } };
残念でした、これでもやっぱりダメ。てかむしろ改悪なんですわ。get()はデータが入ってくるのを待ってます。ってことは、他のスレッドがset()してくれるのを待っているってことです。ところがget()/set()は1つのmutexでガードされているのでget()が終わらぬ限りset()されません。鍵かけた空箱にバナナが入るのをひたすら待ってる哀れなサル状態、しかも肝心の鍵はサルが握ってるというdeadlock状態。これを解消するには、データが入ってくるのを待ってる間はmutexのガードを解いておかねばならんのです。バナナが欲しけりゃ鍵開けとけと。
やれやれ、ようやく条件変数:condition_variableを語る準備ができました。マクラが長くてごめんなさいね(二度目)。