CodeZine(コードジン)

特集ページ一覧

RxJavaの非同期処理と新機能ParallelFlowable

RxJavaによるリアクティブプログラミング入門(7)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2017/09/13 14:00

目次

2.0.5より追加されたParallelFlowable

 RxJavaではバージョン2.0.5より新しい機能としてParallelFlowableが追加されました。この機能はFlowableにしかなく、mapオペレータやfilterオペレータなどのいくつかの機能を並列して実行できるようにする機能です。この並列して実行する機能(並列モード)とは、受け取ったデータを一つずつ順に実行するのではなく、受け取ったデータをそれぞれ異なるタイムライン(公式のサイトでは「rail(レール)」と呼ばれている)に分配して、複数のレール上で分配したデータを処理するようにすることです。ここでは、このParallelFlowableについて見ていきましょう。

 ※今回はRxJava 2.1.2で検証しています。

ParallelFlowableの概要

 並列モードで処理を行えるようにするには、まず元となるFlowableからParallelFlowableを生成しなければなりません。このParallelFlowableは、通常のFlowableと異なり、各データを異なるレールに分配して処理を行うようにするものです。つまり、ParallelFlowableは複数のSubscriberを持つことになります。

ParallelFlowableのデータ通知
ParallelFlowableのデータ通知

 これは複数のSubscriberに同じデータを通知する「Hot」なFlowableと異なり、1つのデータは1つのSubscriberのみに通知され、別のデータは別のSubscriberに通知されるように、各Subscriberは異なるデータを受け取ることになります。

 このParallelFlowableを生成するにはFlowableに追加されたメソッドであるparallel()を呼び出すか、ParallelFlowableのstaticな生成メソッドであるfrom(Publisher<T> flowable)を呼び出すことで、元のFlowableをParallelFlowableに変換することができます。また、引数に分岐するレールの個数(parallelism)も指定でき、指定をしない場合はその実行環境の論理プロセッサ数が設定されます。

主なParallelFlowableの生成メソッド
  • Flowable#parallel()
  • Flowable#parallel(int parallelism)
  • ParallelFlowable#from(Publisher<T> flowable)
  • ParallelFlowable#from(Publisher<T> flowable, int parallelism)

 ※PublisherはFlowableが実装しているインターフェース

 これらのデータを受け取った際の処理をそれぞれ異なるスレッド上で実行することで、あるデータの処理が終わっていなくても、次のデータの処理を実行することが可能になります。つまり、各レールの処理を異なるスレッド上で行うことで、異なるデータを受け取った複数の処理を並列して実行できるようになります。逆に、生成したParallelFlowableに対してSchedulerを設定せずに処理を行うと、データを通知するスレッドと同じスレッド上で処理を行うことになるため、1つのデータの処理が終わるまで次のデータの処理を始めることができなくなってしまいます。このParallelFlowableが処理を行うSchedulerを設定するにはParallelFlowableのrunOnメソッドを使います。

Scheduler設定メソッド
  • runOn(Scheduler scheduler)

 加えて、ParallelFlowableが異なるレール上で生成したデータをマージして、1つのレールに通知する通常のFlowableに変換することも可能です。それにより、オペレータ上での処理は並列で実行し、最終的にSubscriberに通知する際は1つのSubscriberだけに通知することが可能になります。

ParallelFlowableからFlowableの変換した際のデータ通知
ParallelFlowableからFlowableの変換した際のデータ通知

 このようにParallelFlowableからFlowableに変換するには、ParallelFlowableのsequential()を呼び出します。

Flowableへの変換メソッド
  • sequential()

 注意点として、並列処理を異なるスレッド上で実行する場合、後から通知されたデータの処理が先に終わることがあります。そのため、結果として通知されるデータは元のFlowableが通知したデータ順になっていない可能性があることを認識する必要があります。

ParallelFlowableを使った簡単なサンプル

 次のサンプルでは、「1」から「5」までの数値を通知するFlowableからprallelメソッドを使ってParallelFlowableを生成し、受け取ったデータを10倍にするよう異なるスレッド上で処理を行います。最後にそのParallelFlowableが処理を行ったデータをマージし、通知するFlowableに変換してデータを通知するようにしています。また、ParallelFlowableがデータを通知する際にdoOnNextメソッドを通して、どのスレッド上で処理を行っているのかを出力しています。

public static void main(String[] args) throws Exception {
  // 元となるFlowableからParallelFlowableを生成する
  ParallelFlowable<Integer> parallelFlowable =
      // 元となるFlowable
      Flowable.range(1, 5)
          // 並列モードにする
          .parallel()  // (1)
          // 異なるスレッド上で実行するようにする
          .runOn(Schedulers.computation())  // (2)
          // 各データを10倍にする
          .map(data -> data * 10)
          // 通知処理をしているスレッドを出力
          .doOnNext(data -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(
                "----- 通知: " + threadName + ": " + data);
          });
  
  // ParallelFlowableをFlowableに変換する
  Flowable<Integer> flowable = parallelFlowable.sequential();  // (3)
  
  // 購読する
  flowable.subscribe(
      // データ受取時の処理
      data -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + ": start: " + data);
        Thread.sleep(10L);
        System.out.println(threadName + ": end: " + data);
      },
      // エラー時の処理
      e -> e.printStackTrace(),
      // 完了時の処理
      () -> System.out
          .println(Thread.currentThread().getName() + ": 完了"));
  
  // しばらく待つ
  Thread.sleep(1000L);
}
  1. FlowableをParallelFlowableに変換する
  2. ParallelFlowableが行う処理をSchedulers.computation()から取得したSchedulerが管理するスレッド上で処理を行うようにする
  3. ParallelFlowableをFlowableに変換する
実行結果
----- 通知: RxComputationThreadPool-4: 40
----- 通知: RxComputationThreadPool-3: 30
RxComputationThreadPool-4: start: 40
----- 通知: RxComputationThreadPool-1: 10
----- 通知: RxComputationThreadPool-2: 20
----- 通知: RxComputationThreadPool-1: 50
RxComputationThreadPool-4: end: 40
RxComputationThreadPool-4: start: 10
RxComputationThreadPool-4: end: 10
RxComputationThreadPool-4: start: 20
RxComputationThreadPool-4: end: 20
RxComputationThreadPool-4: start: 30
RxComputationThreadPool-4: end: 30
RxComputationThreadPool-4: start: 50
RxComputationThreadPool-4: end: 50
RxComputationThreadPool-4: 完了

 実行結果より、ParallelFloableから異なるスレッド上にあるレールにデータが別々に通知され、それぞれのレール上で受け取ったデータに値する処理を行い(mapメソッド)、sequentialメソッドでFlowableに戻すことで最終的に1つのSubscriberにデータが通知されていることがわかります。

まとめ

 今回はRxJavaの非同期処理について見てきました。基本的にRxJavaの非同期処理はsubscribeOnメソッドやobserveOnメソッドを使ってどこを非同期処理させるのかがポイントになります。また、オペレータ内で生成されるFlowable/Observableが異なるスレッド上で実行される場合についても見てきました。加えて、RxJava 2.0.5から新たに追加されたParallelFlowableについても簡単に紹介しました。ただし、このParallelFlowabeはまだベータ版なので、今後APIや挙動などが変わっていく可能性がある、という点については注意してください。次回はこのPrallelFlowableについても少し詳しく見ていきます。



  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5