Asynchronous Agentsの便利な脇役
call
出力ポートとしてふるまうオブジェクトで、call<T>にsend()された(型Tの)データはコンストラクタに与えた関数オブジェクトに引き渡されます。先のサンプル:greeting_input.cppではgreeting_agent::run()の中で直接std::coutに出力してますけど、この部分をITarget<std::string>へのsend()に置き換え、データをcall<std::string>に流してみましょう(greeting_agent自体は入出力が外付けとなったおかげでツブシが効きますね)。
#include <iostream> #include <sstream> #include <string> #include <agents.h> class greeting_agent : public concurrency::agent { private: std::string title_; concurrency::ISource<std::string>& source_; concurrency::ITarget<std::string>& target_; protected: void run() override { std::string message; while ( !(message = concurrency::receive(source_)).empty() ) { std::ostringstream stream; stream << title_ << message << std::endl; concurrency::send(target_, stream.str()); } done(); // 終了時には必ず done() すべし! } public: greeting_agent(concurrency::ISource<std::string>& source, concurrency::ITarget<std::string>& target, const std::string& title) : source_(source), target_(target), title_(title) {} }; int main() { concurrency::unbounded_buffer<std::string> in_buffer; // printerにsend()された文字列はcoutに出力される concurrency::call<std::string> printer([](const std::string& str) { std::cout << str; }); greeting_agent american(in_buffer, printer, "Hello! "); american.start(); std::cout << "send strings ..." << std::endl; for ( int i = 0; i < 10; ++i ) { concurrency::send(in_buffer, std::to_string(i)); } concurrency::send(in_buffer, std::string()); // 終わりの合図:空文字列を送信 concurrency::agent::wait(&american); std::cout << "finished." << std::endl; }
timer
一定周期で固定メッセージ(定数)をISource<T>にsend()するオブジェクトです。timer<T>を使って定期的にcall<T>に着火すれば"なんちゃってwall-clock"が作れます。
#define _CRT_SECURE_NO_WARNINGS #include <iostream> #include <chrono> #include <ctime> #include <thread> #include <agents.h> int main() { using namespace std; using namespace std::chrono; concurrency::call<int> wall_clock( // メッセージが届いたら現在時刻を表示する [](int) { time_t now = system_clock::to_time_t(system_clock::now()); cout << ctime(&now) << flush; }); // 1000msごとにwall_clockにメッセージを投げる concurrency::timer<int> tick(1000, // 1000[ms] ごとに 1, // 定数:1 を &wall_clock, // wall_clock に投げる true); // (falseだと一回だけ) tick.start(); this_thread::sleep_for(seconds(10)); // 10秒ほど寝る tick.stop(); }
transformer
データ変換器。transformer<T,U>に型Tのデータをsend()すると、コンストラクタに与えた関数オブジェクトで型Uに変換して次段に流してくれます。timer/transformer/callを繋いで作った"なんちゃってwall-clock"がコチラ。
#define _CRT_SECURE_NO_WARNINGS #include <iostream> #include <string> #include <chrono> #include <ctime> #include <thread> #include <agents.h> int main() { using namespace std; using namespace std::chrono; // メッセージをそのまま出力する concurrency::call<string> print( [](const string& str) { cout << str << flush; }); // head に現在時刻をくっつける concurrency::transformer<string,string> make_time( [](const string head) { time_t current = system_clock::to_time_t(system_clock::now()); return head + ctime(¤t); }, &print); // 1000msごとにwall_clockにメッセージを投げる concurrency::timer<string> tick( 1000, // 1000[ms] ごとに "只今: ", // 定数:"只今: " を &make_time, // make_time に投げる true); // (falseだと一回だけ) tick.start(); // timerを起動して this_thread::sleep_for(seconds(10)); // 10秒ほど寝る tick.stop(); }
transformerとagentを使って階乗を求めてみました。factorial_agentは整数ペア(x,y)をreceive()し、y == 0ならxを出力、y != 0なら(x*y,y-1)を入力に差し戻します。transformerで整数nを整数ペア(1,n)に変換して入力ポートに投げ込めば、(1,n)→(1*n,n-1)→(1*n*(n-1),n-2)→……を繰り返して最終的には(n!,0)が得られるというカラクリ。
#include <iostream> #include <utility> #include <agents.h> #include <conio.h> using namespace concurrency; typedef std::pair<int,int> pair_t; class factorial_agent : public agent { ISource<pair_t>& source_; ITarget<pair_t>& target_; ITarget<int>& result_; protected: void run() override { pair_t pair; while ( true ) { // pair: (x,y) を受信する pair = receive(source_); if ( pair.second == 0 ) { // (0,0) なら終了 if ( pair.first == 0 ) break; // (x,0) なら x を resultに送信 send(result_, pair.first); } else { // (x*y, y-1) をtargetに送信 send(target_, pair_t(pair.first*pair.second, pair.second-1)); } } done(); } public: factorial_agent(ISource<pair_t>& source, ITarget<pair_t>& target, ITarget<int>& result) : source_(source), target_(target), result_(result) {} }; int main() { // pair<int,int>のFIFOバッファ unbounded_buffer<pair_t> buffer; // 受信した int x を pair<int,int>(1,x) に変換して後段に中継する transformer<int, pair_t> make_pair([](int value) { return pair_t(1,value);}, &buffer); // 受信した int x を出力する call<int> print([](int value) { std::cout << value << std::endl; }); // 階乗agent factorial_agent factorial(buffer, buffer, print); factorial.start(); for ( int i = 1; i < 10; ++i ) { send(make_pair, i); } std::cout << "hit any kay to exit." << std::endl; _getch(); send(buffer, pair_t(0,0)); agent::wait(&factorial); }
make_join
message-block:mb0, mb1……を用意しておき、
auto joint = make_join(&mb0, &mb1,……); // 10個までOK auto msg = receive(joint);
すると、make_join()に与えた全message-blockに受信データが揃うのを待ち、各message-blockから受信したデータをstd::tupleにまとめてくれます。各データはstd::get<0>(msg), std::get<1>(msg)……で得られます。tupleなので受信するデータの型は異なっていても構いません。
make_join()を使ったサンプルがコチラ、キー入力と500ms毎の計時をそれぞれ別のスレッドで行い、両者が揃った時に現在時刻を出力します。
#define _CRT_SECURE_NO_WARNINGS #include <iostream> #include <string> #include <chrono> #include <ctime> #include <thread> #include <agents.h> #include <conio.h> int main() { using namespace std; using namespace std::chrono; concurrency::overwrite_buffer<time_t> tmbuf; concurrency::overwrite_buffer<char> kbbuf; bool stop = false; // 500msごとに現在時刻をtmbufに送信する thread tmthread([&]() { while (!stop) { this_thread::sleep_for(milliseconds(500)); time_t current = system_clock::to_time_t(system_clock::now()); concurrency::send(tmbuf,current); } }); // 200msごとにキー入力があればそのキーコードをkbbufに送信する thread kbthread([&]() { while ( !stop ) { if ( _kbhit() ) { concurrency::send(kbbuf, (char)_getch()); } else { this_thread::sleep_for(milliseconds(200)); } } }); cout << "--- hit any key to show current-time (ESC to quit)" << endl; auto joint = concurrency::make_join(&tmbuf, &kbbuf); while ( !stop ) { // 時刻とキー入力が揃うまで待ち、 auto msg = concurrency::receive(joint); // それぞれを出力する time_t time = get<0>(msg); char ch = get<1>(msg); if ( ch == 0x1b ) { stop = true; } else { cout << ch << " : " << ctime(&time); } } tmthread.join(); kbthread.join(); }
make_choice
make_join()がすべてのデータが受信されるのを待つのに対し、複数のmessage-blockのうちどれか1つにデータが届くのを待つのがmake_choice()。
auto selector = make_choice(&mb0, &mb1, ……); // 10個までOK size_t n = receive(selector);
make_choice()に与えたmessage-blockのうちどれか1つにデータが届いたらその番号が得られます。そこで改めて:
switch ( n ) { case 0: val0 = receive(mb0); break; case 1: val1 = receive(mb1); break; …… }
まとめ
以上、Asynchronous Agentsの構成要素をざっくりと紹介しました。
実際のところマルチスレッド・アプリケーションの設計/実装には厄介な問題がつきまといます。厄介事の多くはdata race(データ競合)に起因します。1つのデータを複数のスレッドが同時に読み書きすることで、つじつまが合わなくなるのがdata race、困ったことにdata raceは多くの場合再現性に乏しく原因を見つけ出すのも一苦労、data race発生が確率的しかもその確率は通常かなり小さいのです。mutexなどで排他制御するのがdata race回避の常套手段なのですが、排他制御はある区間において複数のスレッドが同時に動くことを抑止するものですから、安全性を求めるあまり排他区間を長くする、あるいは多くのスレッドが排他されるとマルチスレッドによるパフォーマンスを著しく阻害する一因となります。
Asynchronous Agentsは排他制御を極力排除したうえでより安全にかつ効率的にマルチスレッド・アプリケーションを実装する計算/処理モデル:データフローをライブラリ化したものです。実際のアプリケーション設計/実装にデータフローが使えるシチュエーションはそんなに多くはないでしょう。けれど効率的/スケーラブルなマルチスレッド・アプリケーション設計/実装の有効な手段として道具箱の隅に置いておくのも悪くはないんじゃないかしら。