SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

特集記事

データフローの構築と実行をサポートする、Microsoft PPLの「Asynchronous Agents」

  • X ポスト
  • このエントリーをはてなブックマークに追加

Asynchronous Agentsの便利な脇役

call

 出力ポートとしてふるまうオブジェクトで、call<T>にsend()された(型Tの)データはコンストラクタに与えた関数オブジェクトに引き渡されます。先のサンプル:greeting_input.cppではgreeting_agent::run()の中で直接std::coutに出力してますけど、この部分をITarget<std::string>へのsend()に置き換え、データをcall<std::string>に流してみましょう(greeting_agent自体は入出力が外付けとなったおかげでツブシが効きますね)。

list-03 greeting_input_output.cpp
#include <iostream>
#include <sstream>
#include <string>
#include <agents.h>

class greeting_agent : public concurrency::agent {
private:
  std::string title_;
  concurrency::ISource<std::string>& source_;
  concurrency::ITarget<std::string>& target_;
protected:
  void run() override {
    std::string message;
    while ( !(message = concurrency::receive(source_)).empty() ) {
      std::ostringstream stream;
      stream << title_ << message << std::endl;
      concurrency::send(target_, stream.str());
    }
    done(); // 終了時には必ず done() すべし!
  }
public:
  greeting_agent(concurrency::ISource<std::string>& source, concurrency::ITarget<std::string>& target, const std::string& title) 
    : source_(source), target_(target), title_(title) {}
};

int main() {

  concurrency::unbounded_buffer<std::string> in_buffer;

  // printerにsend()された文字列はcoutに出力される
  concurrency::call<std::string> printer([](const std::string& str) { std::cout << str; });
  greeting_agent american(in_buffer, printer, "Hello! ");

  american.start();
  std::cout << "send strings ..." << std::endl; 
  for ( int i = 0; i < 10; ++i ) {
    concurrency::send(in_buffer, std::to_string(i));
  }
  concurrency::send(in_buffer, std::string()); // 終わりの合図:空文字列を送信

  concurrency::agent::wait(&american);

  std::cout << "finished." << std::endl;
}
fig-04
fig-04

timer

 一定周期で固定メッセージ(定数)をISource<T>にsend()するオブジェクトです。timer<T>を使って定期的にcall<T>に着火すれば"なんちゃってwall-clock"が作れます。

list-04 timer_call.cpp
#define _CRT_SECURE_NO_WARNINGS
#include <iostream>
#include <chrono>
#include <ctime>
#include <thread>
#include <agents.h>

int main() {
  using namespace std;
  using namespace std::chrono;
  concurrency::call<int> wall_clock(
    // メッセージが届いたら現在時刻を表示する
    [](int) {
      time_t now = system_clock::to_time_t(system_clock::now()); 
      cout << ctime(&now) << flush; 
    });

  // 1000msごとにwall_clockにメッセージを投げる
  concurrency::timer<int> tick(1000,        // 1000[ms] ごとに
                               1,           // 定数:1 を
                               &wall_clock, // wall_clock に投げる
                               true);       // (falseだと一回だけ)
  tick.start();

  this_thread::sleep_for(seconds(10)); // 10秒ほど寝る
  tick.stop();
}
fig-05
fig-05

transformer

 データ変換器。transformer<T,U>に型Tのデータをsend()すると、コンストラクタに与えた関数オブジェクトで型Uに変換して次段に流してくれます。timer/transformer/callを繋いで作った"なんちゃってwall-clock"がコチラ。

list-05 timer_transformer_call.cpp
#define _CRT_SECURE_NO_WARNINGS
#include <iostream>
#include <string>
#include <chrono>
#include <ctime>
#include <thread>
#include <agents.h>

int main() {
  using namespace std;
  using namespace std::chrono;

  // メッセージをそのまま出力する
  concurrency::call<string> print(
    [](const string& str) {
      cout << str << flush; 
    });

  // head に現在時刻をくっつける
  concurrency::transformer<string,string> make_time(
    [](const string head) { 
      time_t current = system_clock::to_time_t(system_clock::now()); 
      return head + ctime(&current);
    }, 
    &print);

  // 1000msごとにwall_clockにメッセージを投げる
  concurrency::timer<string> tick(
    1000,     // 1000[ms] ごとに
    "只今: ", // 定数:"只今: " を
    &make_time,  // make_time に投げる
    true);       // (falseだと一回だけ)

  tick.start(); // timerを起動して
  this_thread::sleep_for(seconds(10)); // 10秒ほど寝る
  tick.stop();

}
fig-06
fig-06

 transformerとagentを使って階乗を求めてみました。factorial_agentは整数ペア(x,y)をreceive()し、y == 0ならxを出力、y != 0なら(x*y,y-1)を入力に差し戻します。transformerで整数nを整数ペア(1,n)に変換して入力ポートに投げ込めば、(1,n)→(1*n,n-1)→(1*n*(n-1),n-2)→……を繰り返して最終的には(n!,0)が得られるというカラクリ。

list-06 factorial.cpp
#include <iostream>
#include <utility>
#include <agents.h>
#include <conio.h>

using namespace concurrency;

typedef std::pair<int,int> pair_t;

class factorial_agent : public agent {
  ISource<pair_t>& source_;
  ITarget<pair_t>& target_;
  ITarget<int>&    result_;
protected:
  void run() override {
    pair_t pair;
    while ( true ) {
      // pair: (x,y) を受信する
      pair = receive(source_);
      if ( pair.second == 0 ) {
        // (0,0) なら終了
        if ( pair.first == 0 ) break;
        // (x,0) なら x を resultに送信
        send(result_, pair.first);
      } else {
        // (x*y, y-1) をtargetに送信
        send(target_, pair_t(pair.first*pair.second, pair.second-1));
      } 
    }
    done();
  }
public:
  factorial_agent(ISource<pair_t>& source,
                  ITarget<pair_t>& target,
                  ITarget<int>&                result)
    : source_(source), target_(target), result_(result) {}
};

int main() {

  // pair<int,int>のFIFOバッファ
  unbounded_buffer<pair_t> buffer;

  // 受信した int x を pair<int,int>(1,x) に変換して後段に中継する
  transformer<int, pair_t> make_pair([](int value) { return pair_t(1,value);},
                                     &buffer);

  // 受信した int x を出力する
  call<int> print([](int value) { std::cout << value << std::endl; });

  // 階乗agent
  factorial_agent factorial(buffer, buffer, print);

  factorial.start();

  for ( int i = 1; i < 10; ++i ) {
    send(make_pair, i);
  }

  std::cout << "hit any kay to exit." << std::endl;
  _getch();
  send(buffer, pair_t(0,0));
  agent::wait(&factorial);

}
fig-07
fig-07
fig-08
fig-08

make_join

 message-block:mb0, mb1……を用意しておき、

auto joint = make_join(&mb0, &mb1,……); // 10個までOK
auto msg = receive(joint);

すると、make_join()に与えた全message-blockに受信データが揃うのを待ち、各message-blockから受信したデータをstd::tupleにまとめてくれます。各データはstd::get<0>(msg), std::get<1>(msg)……で得られます。tupleなので受信するデータの型は異なっていても構いません。

 make_join()を使ったサンプルがコチラ、キー入力と500ms毎の計時をそれぞれ別のスレッドで行い、両者が揃った時に現在時刻を出力します。

list-07 make_join.cpp
#define _CRT_SECURE_NO_WARNINGS
#include <iostream>
#include <string>
#include <chrono>
#include <ctime>
#include <thread>
#include <agents.h>
#include <conio.h>

int main() {
  using namespace std;
  using namespace std::chrono;

  concurrency::overwrite_buffer<time_t> tmbuf;
  concurrency::overwrite_buffer<char>   kbbuf;
  bool stop = false;

  // 500msごとに現在時刻をtmbufに送信する
  thread tmthread([&]() { 
                    while (!stop) { 
                      this_thread::sleep_for(milliseconds(500));
                      time_t current = system_clock::to_time_t(system_clock::now());
                      concurrency::send(tmbuf,current);
                    }
                  });

  // 200msごとにキー入力があればそのキーコードをkbbufに送信する
  thread kbthread([&]() {
                    while ( !stop ) {
                      if ( _kbhit() ) {
                         concurrency::send(kbbuf, (char)_getch());
                      } else {
                        this_thread::sleep_for(milliseconds(200));
                      }
                    }
                  });

  cout << "--- hit any key to show current-time (ESC to quit)" << endl;

  auto joint = concurrency::make_join(&tmbuf, &kbbuf);
  while ( !stop ) {
    // 時刻とキー入力が揃うまで待ち、
    auto msg = concurrency::receive(joint);
    // それぞれを出力する
    time_t time = get<0>(msg);
    char   ch   = get<1>(msg);
    if ( ch == 0x1b ) { stop = true; }
    else { cout << ch << " : " << ctime(&time); }
  }

  tmthread.join();
  kbthread.join();
}
fig-09
fig-09

make_choice

 make_join()がすべてのデータが受信されるのを待つのに対し、複数のmessage-blockのうちどれか1つにデータが届くのを待つのがmake_choice()。

auto selector = make_choice(&mb0, &mb1, ……); // 10個までOK
size_t n = receive(selector);

 make_choice()に与えたmessage-blockのうちどれか1つにデータが届いたらその番号が得られます。そこで改めて:

switch ( n ) {
case 0: val0 = receive(mb0); break;
case 1: val1 = receive(mb1); break;
……
}

まとめ

 以上、Asynchronous Agentsの構成要素をざっくりと紹介しました。

 実際のところマルチスレッド・アプリケーションの設計/実装には厄介な問題がつきまといます。厄介事の多くはdata race(データ競合)に起因します。1つのデータを複数のスレッドが同時に読み書きすることで、つじつまが合わなくなるのがdata race、困ったことにdata raceは多くの場合再現性に乏しく原因を見つけ出すのも一苦労、data race発生が確率的しかもその確率は通常かなり小さいのです。mutexなどで排他制御するのがdata race回避の常套手段なのですが、排他制御はある区間において複数のスレッドが同時に動くことを抑止するものですから、安全性を求めるあまり排他区間を長くする、あるいは多くのスレッドが排他されるとマルチスレッドによるパフォーマンスを著しく阻害する一因となります。

 Asynchronous Agentsは排他制御を極力排除したうえでより安全にかつ効率的にマルチスレッド・アプリケーションを実装する計算/処理モデル:データフローをライブラリ化したものです。実際のアプリケーション設計/実装にデータフローが使えるシチュエーションはそんなに多くはないでしょう。けれど効率的/スケーラブルなマルチスレッド・アプリケーション設計/実装の有効な手段として道具箱の隅に置いておくのも悪くはないんじゃないかしら。

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
特集記事連載記事一覧

もっと読む

この記事の著者

επιστημη(エピステーメー)

C++に首まで浸かったプログラマ。Microsoft MVP, Visual C++ (2004.01~2018.06) "だった"りわんくま同盟でたまにセッションスピーカやったり中国茶淹れてにわか茶...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/10003 2017/03/28 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング