SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

特集記事

データフローの構築と実行をサポートする、Microsoft PPLの「Asynchronous Agents」

  • このエントリーをはてなブックマークに追加

 前回、Microsft PPL(Parallel Patterns LIbrary)の並列コンテナを紹介しました。PPLには他にも面白いマルチスレッドのからくりが仕込まれています。その中からデータフローの構築と実行をサポートしてくれるAsynchronous Agents(非同期エージェント)を軽くひとめぐり。

  • このエントリーをはてなブックマークに追加

Asynchronous Agentsとは

 英和辞典によりますと"agent"は代理業者/仲介人あるいは作用因子を意味するみたい。仕事を依頼すれば勝手にやっといてくれるイメージでしょうか。ASynchronous Agentsの主役はclass agent、agentから導出したクラスで純粋仮想関数:void run()を再定義しておき、インスタンスに対してstart()するとrun()が動き出します。ともかくもやってみましょう。

list-01 greeting.cpp
#include <iostream>
#include <sstream>
#include <string>
#include <agents.h>

class greeting_agent : public concurrency::agent {
private:
  std::string message_;
  int         repeat_;
protected:
  void run() override {
    // message を repeat 回プリントする
    for ( int i = 0; i < repeat_; ++i ) {
      std::ostringstream stream;
      stream << message_ << std::endl;
      std::cout << stream.str();
    }
    done(); // 終了時には必ず done() すべし!
  }
public:
  greeting_agent(const std::string& message, int repeat)
    : message_(message), repeat_(repeat) {}
};

int main() {
  greeting_agent american("Hello!", 5);
  greeting_agent german("Guten Tag!", 3);

  american.start();
  german.start();

  std::cout << "waiting...\n";

  concurrency::agent::wait(&american);
  concurrency::agent::wait(&german);

  std::cout << "finished." << std::endl;
}
fig-01
fig-01

 agentをstart()すればrun()が動き出す、たったそれだけのことなのですが、main()はrun()の完了を待つことなくstart()から抜け出して"waiting..."を出力しています。各agentで再定義されたrun()はmain()とは異なるスレッドで非同期に動いているってことで、Asynchronous Agentsを使えばスレッドを意識せずにマルチスレッド・アプリケーションが作れるわけです。

 agent::run()は引数も戻り値も持たないダルマさん、このままじゃつまんないので入力ポートをくっつけましょう。agentの入力ポートはISource<T>で、T receive(ISource<T>& source)で入力ポート:sourceに溜まったデータを受信することができます。

list-02 greeting_input.cpp
#include <iostream>
#include <sstream>
#include <string>
#include <agents.h>

class greeting_agent : public concurrency::agent {
private:
  std::string title_;
  concurrency::ISource<std::string>& source_;
protected:
  void run() override {
    std::string message;
    while ( !(message = concurrency::receive(source_)).empty() ) {
      std::ostringstream stream;
      stream << title_ << message << std::endl;
      std::cout << stream.str();
    }
    done(); // 終了時には必ず done() すべし!
  }
public:
  greeting_agent(concurrency::ISource<std::string>& source, const std::string& title) 
    : source_(source), title_(title) {}
};

int main() {
//concurrency::overwrite_buffer<std::string> in_buffer;
  concurrency::unbounded_buffer<std::string> in_buffer;
  greeting_agent american(in_buffer, "Hello! ");

  american.start();
  std::cout << "send strings ..." << std::endl; 
  for ( int i = 0; i < 10; ++i ) {
    concurrency::send(in_buffer, std::to_string(i));
  }
  concurrency::send(in_buffer, std::string()); // 終わりの合図:空文字列を送信

  concurrency::agent::wait(&american);

  std::cout << "finished." << std::endl;
}

 ISourceT<T>が入力ポートなのに対しITarget<T>は出力ポートで、send(ITarget<T>& target, T value)でtargetにvalueを送信することができます。上記サンプルで使っているoverwrite_buffer<T>はISource<T>とITarget<T>の双方から導出されているのでsend/reciveのできる入出力ポートとして機能します。コンパイル/実行すると:

fig-02
fig-02

 ……あれま、投げ込んだデータの大半を取りこぼしています。main()から次々と投げ込まれるデータに受信が追いつけなかったみたい。overwrite_buffer<T>にsend()されたデータはreceive()が間に合わないと上書きされちゃうんです(だからoverwrite_buffer)。overwrite_buffer<T>改め、send()されたデータを貯めこんでくれるunbounded_buffer<T>に差し替えると:

fig-03
fig-03

 Asynchronous Agentsではagentに対する入出力ポートをmessage-blockと称しています。複数のagentをmessage-blockで繋ぎ、message-blockにsendされたデータをagentがreceive()し/加工して/次段のmessage-blockにsend()することでデータの(処理の)流れ:データフローを組み立てます。ベルトコンベアと工作機械を交互に並べ、あっち端から材料を流し込むとこっち端から製品が出てくる、しかも工作機械は互いの同期を取らずひたすら受け取って/加工して/次のラインに流すわけ。以前に書いたアーティクル:『インテルTBB:GlowGraphによるデータフロープログラミング』と同じカラクリです。agentをいくつか用意してstart()すればそれぞれのrun()が空いたスレッドの中で勝手に動いてくれるのでプログラマは明示的にスレッドを起こす必要がありません。いくつのスレッドを起こし/どのスレッドで/どのagentを動かすかはAsynchronous Agentsライブラリに一任します。そのかわりそれぞれのrun()の中ではbusy-loopなど無駄に時間(CPUリソース)を食う処理はご法度、agentに対する入力/出力は極力message-blockに対するsend/receiveのみを使います。

 Asynchronous Agentsの主役:agentの入出力ポートとして機能するのがmessage-blockであるoverwrite_buffer<T>やunbounded_buffer<T>ですが、加えてちょっと便利な脇役たちがいくつか用意されています。

会員登録無料すると、続きをお読みいただけます

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

次のページ
Asynchronous Agentsの便利な脇役

この記事は参考になりましたか?

  • このエントリーをはてなブックマークに追加
特集記事連載記事一覧

もっと読む

この記事の著者

επιστημη(エピステーメー)

C++に首まで浸かったプログラマ。Microsoft MVP, Visual C++ (2004.01~2018.06) "だった"りわんくま同盟でたまにセッションスピーカやったり中国茶淹れてにわか茶...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/10003 2017/03/28 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング