Asynchronous Agentsとは
英和辞典によりますと"agent"は代理業者/仲介人あるいは作用因子を意味するみたい。仕事を依頼すれば勝手にやっといてくれるイメージでしょうか。ASynchronous Agentsの主役はclass agent、agentから導出したクラスで純粋仮想関数:void run()を再定義しておき、インスタンスに対してstart()するとrun()が動き出します。ともかくもやってみましょう。
#include <iostream> #include <sstream> #include <string> #include <agents.h> class greeting_agent : public concurrency::agent { private: std::string message_; int repeat_; protected: void run() override { // message を repeat 回プリントする for ( int i = 0; i < repeat_; ++i ) { std::ostringstream stream; stream << message_ << std::endl; std::cout << stream.str(); } done(); // 終了時には必ず done() すべし! } public: greeting_agent(const std::string& message, int repeat) : message_(message), repeat_(repeat) {} }; int main() { greeting_agent american("Hello!", 5); greeting_agent german("Guten Tag!", 3); american.start(); german.start(); std::cout << "waiting...\n"; concurrency::agent::wait(&american); concurrency::agent::wait(&german); std::cout << "finished." << std::endl; }
agentをstart()すればrun()が動き出す、たったそれだけのことなのですが、main()はrun()の完了を待つことなくstart()から抜け出して"waiting..."を出力しています。各agentで再定義されたrun()はmain()とは異なるスレッドで非同期に動いているってことで、Asynchronous Agentsを使えばスレッドを意識せずにマルチスレッド・アプリケーションが作れるわけです。
agent::run()は引数も戻り値も持たないダルマさん、このままじゃつまんないので入力ポートをくっつけましょう。agentの入力ポートはISource<T>で、T receive(ISource<T>& source)で入力ポート:sourceに溜まったデータを受信することができます。
#include <iostream> #include <sstream> #include <string> #include <agents.h> class greeting_agent : public concurrency::agent { private: std::string title_; concurrency::ISource<std::string>& source_; protected: void run() override { std::string message; while ( !(message = concurrency::receive(source_)).empty() ) { std::ostringstream stream; stream << title_ << message << std::endl; std::cout << stream.str(); } done(); // 終了時には必ず done() すべし! } public: greeting_agent(concurrency::ISource<std::string>& source, const std::string& title) : source_(source), title_(title) {} }; int main() { //concurrency::overwrite_buffer<std::string> in_buffer; concurrency::unbounded_buffer<std::string> in_buffer; greeting_agent american(in_buffer, "Hello! "); american.start(); std::cout << "send strings ..." << std::endl; for ( int i = 0; i < 10; ++i ) { concurrency::send(in_buffer, std::to_string(i)); } concurrency::send(in_buffer, std::string()); // 終わりの合図:空文字列を送信 concurrency::agent::wait(&american); std::cout << "finished." << std::endl; }
ISourceT<T>が入力ポートなのに対しITarget<T>は出力ポートで、send(ITarget<T>& target, T value)でtargetにvalueを送信することができます。上記サンプルで使っているoverwrite_buffer<T>はISource<T>とITarget<T>の双方から導出されているのでsend/reciveのできる入出力ポートとして機能します。コンパイル/実行すると:
……あれま、投げ込んだデータの大半を取りこぼしています。main()から次々と投げ込まれるデータに受信が追いつけなかったみたい。overwrite_buffer<T>にsend()されたデータはreceive()が間に合わないと上書きされちゃうんです(だからoverwrite_buffer)。overwrite_buffer<T>改め、send()されたデータを貯めこんでくれるunbounded_buffer<T>に差し替えると:
Asynchronous Agentsではagentに対する入出力ポートをmessage-blockと称しています。複数のagentをmessage-blockで繋ぎ、message-blockにsendされたデータをagentがreceive()し/加工して/次段のmessage-blockにsend()することでデータの(処理の)流れ:データフローを組み立てます。ベルトコンベアと工作機械を交互に並べ、あっち端から材料を流し込むとこっち端から製品が出てくる、しかも工作機械は互いの同期を取らずひたすら受け取って/加工して/次のラインに流すわけ。以前に書いたアーティクル:『インテルTBB:GlowGraphによるデータフロープログラミング』と同じカラクリです。agentをいくつか用意してstart()すればそれぞれのrun()が空いたスレッドの中で勝手に動いてくれるのでプログラマは明示的にスレッドを起こす必要がありません。いくつのスレッドを起こし/どのスレッドで/どのagentを動かすかはAsynchronous Agentsライブラリに一任します。そのかわりそれぞれのrun()の中ではbusy-loopなど無駄に時間(CPUリソース)を食う処理はご法度、agentに対する入力/出力は極力message-blockに対するsend/receiveのみを使います。
Asynchronous Agentsの主役:agentの入出力ポートとして機能するのがmessage-blockであるoverwrite_buffer<T>やunbounded_buffer<T>ですが、加えてちょっと便利な脇役たちがいくつか用意されています。