CodeZine(コードジン)

特集ページ一覧

RxJavaの非同期処理と新機能ParallelFlowable

RxJavaによるリアクティブプログラミング入門(7)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2017/09/13 14:00

目次

オペレータ内で生成される非同期のFlowable/Observable

 RxJavaのメソッドの中には、flatMapメソッドのようにオペレータの中でFlowable/Observableを生成し、それを起動してデータを通知するメソッドがあります。その際、そこで生成しているFlowable/Observableが別スレッド上で実行される場合、データを受け取りそこで生成したFlowable/Observableを起動するところまでは順番に実行されますが、起動されたFlowable/Observableはそれぞれ異なるスレッド上で処理を行います。つまり、使っているメソッドによっては複数のFlowable/Observableが異なるスレッド上で同時に実行されることを意味します。そのため、メソッドによってはデータを受け取った順にFlowable/Observableが生成されたとしても、その結果、データが受け取った順に通知されることが保証されません。ここでは、内部でFlowable/Observableを生成するオペレータの中で、flatMapメソッド、concatMapメソッド、concatMapEagerメソッドを見ていき、異なるスレッド上で処理を行うFlowable/Observableを生成した場合に、どのような通知処理の違いがあるかを見ていきます。

flatMapメソッド

 flatMapメソッドはデータを受け取った際、新たにFlowable/Observableを生成して実行し、通知されたデータを結果として通知するオペレータです。そこで生成されるFlowable/Observableが異なるスレッド上で処理を行い、受け取るデータが連続してくる場合、生成したFlowable/Observableが他のFlowable/Observableの処理が終わっているのかどうか関係なく実行され、それぞれの処理が終わったタイミングで、結果としてのデータが通知されます。そのため、最終的に通知するデータはデータを受け取った順に実行されるとは限りません。

flatMapメソッドの場合
flatMapメソッドの場合

 例えば、justメソッドから生成した「A」「B」「C」と通知するFlowableがあり、flatMapメソッド内で受け取ったデータからdelayメソッドを使ったFlowableを生成し、各データを1000ミリ秒遅らせて通知しようとしているとします。その場合、順番にデータを受け取り、そのデータからFlowableを生成して起動するまで、ほぼ時間をかけずに実行できます。つまり、ほぼ同時に異なるスレッド上で処理を行う複数のFlowableを実行することになります。しかし、JavaではCPUの負荷などの影響もあり正確な時間で処理を行うことはできません。スレッドによっては処理のスピードが遅くなることもあります。それにより、遅れたスレッド上のデータより別のスレッド上のデータが先に通知され、通知される順がデータを受け取った順と同じになるとは限らなくなります。

flatMapメソッド内で異なるスレッド上で動くFlowableを生成した場合
public static void main(String[] args) throws Exception {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.just("A", "B", "C")
          // 受け取ったデータからFlowableを生成し、それが持つデータを通知する
          .flatMap(data -> {
            // 1000ミリ秒遅れてデータを通知するFlowableを生成
            return Flowable.just(data).delay(1000L, TimeUnit.MILLISECONDS);
          });
  
  // 購読する
  flowable.subscribe(data -> {
    String threadName = Thread.currentThread().getName();
    System.out.println(threadName + ": " + data);
  });
  
  // しばらく待つ
  Thread.sleep(2000L);
}
実行結果
RxComputationThreadPool-1: A
RxComputationThreadPool-3: C
RxComputationThreadPool-2: B

 実行結果より、通知されているデータは元のデータを受け取った順になっていないことがわかります。データの順番は関係なく、パフォーマンスを重視する場合はflatMapメソッドが使えますが、データの順番が重要である場合はflatMapメソッドは使わない方がよいでしょう。

concatMapメソッド

 concatMapメソッドは受け取ったデータから内部でFlowable/Observableを生成し、それを一つずつ順番に実行し、通知されたデータを結果として通知するオペレータです。そのため、そこで生成されるFlowable/Observableがそれぞれ異なるスレッド上で処理を行っても影響を受けず、データを受け取った順に新たなFlowable/Observableのデータを通知するようになります。

concatMapメソッドの場合
concatMapメソッドの場合

 例えば「A」「B」「C」と通知するFlowableがあり、concatMapメソッド内で受け取ったデータからdelayメソッドを使ったFlowableを生成し、そのFlowableが1000ミリ秒遅らせてデータを通知しようとしているとします。その場合、データを受け取ってからFlowableを生成して起動し、そのFlowableの処理が終わるまで次のデータのFlowableを生成しません。Flowableが異なるスレッド上で処理を行っても、そのことに関係なく、受け取ったデータの順番で新しいデータを通知することになります。

concatMapメソッド内で異なるスレッド上で動くFlowableを生成した場合
public static void main(String[] args) throws Exception {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.just("A", "B", "C")
          // 受け取ったデータからFlowableを生成し、それが持つデータを通知する
          .concatMap(data -> {
            // 1000ミリ秒遅れてデータを通知するFlowableを生成
            return Flowable.just(data).delay(1000L, TimeUnit.MILLISECONDS);
          });
  
  // 購読する
  flowable.subscribe(data -> {
    String threadName = Thread.currentThread().getName();
    String time =
        LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"));
    System.out.println(threadName + ": data=" + data + ", time=" + time);
  });
  
  // しばらく待つ
  Thread.sleep(4000L);
}
実行結果
RxComputationThreadPool-1: data=A, time=18.152
RxComputationThreadPool-2: data=B, time=19.228
RxComputationThreadPool-3: data=C, time=20.230

 実行結果より、元のデータが通知された順に新しいデータを通知することがわかります。そして、データを受け取った間隔がほぼ1秒(1000ミリ秒)ごとになっているのがわかります。このため、パフォーマンスに関係なくデータの順番を重視する場合はconcatMapメソッドが使えます。しかし、パフォーマンスが重要な場合はconcatMapメソッドは使わない方がよいことになります。

concatMapEagerメソッド

 concatMapEagerメソッドはデータを受け取ったら、新たにFlowable/Observableを生成してすぐに実行し、そのFlowable/Observableが通知するデータを元のデータを受け取った順に通知するオペレータです。そこで生成されるFlowable/Observableが異なるスレッド上で処理を行う場合、生成したFlowable/ObservableはflatMapメソッドのように同時に実行されることがあります。しかし、結果として通知するデータはconcatMapメソッドのように元のデータを受け取った順になります。

concatMapEagerメソッドの場合
concatMapEagerメソッドの場合

 例えば「A」「B」「C」と通知するFlowableがあり、concatMapEagerメソッド内で受け取ったデータからdelayメソッドを使ったFlowableを生成し、そのFlowableが1000ミリ秒遅らせてデータを通知しようとしているとします。その場合、順番にデータを受け取り、そのデータからFlowableを生成して起動するまで、ほぼ時間をかけずに実行できます。しかし、結果として通知するデータは内部でFlowableが生成された順のため、通知されるまで生成されたデータがバッファされることになります。

concatMapEagerメソッド内で異なるスレッド上で動くFlowableを生成した場合
public static void main(String[] args) throws Exception {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.just("A", "B", "C")
          // 受け取ったデータからFlowableを生成し、それが持つデータを通知する
          .concatMapEager(data -> {
            // 1000ミリ秒遅れてデータを通知するFlowableを生成
            return Flowable.just(data).delay(1000L, TimeUnit.MILLISECONDS);
          });
  
  // 購読する
  flowable.subscribe(data -> {
    String threadName = Thread.currentThread().getName();
    String time =
        LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"));
    System.out.println(threadName + ": data=" + data + ", time=" + time);
  });
  
  // しばらく待つ
  Thread.sleep(2000L);
}
実行結果
RxComputationThreadPool-1: data=A, time=20.340
RxComputationThreadPool-1: data=B, time=20.353
RxComputationThreadPool-1: data=C, time=20.354

 実行結果より、元のデータが通知された順に新しいデータを通知することがわかります。そして、データがほぼ同時に通知されていることもわかります。このため、データの順番とパフォーマンスの両方を重視する場合に、concatMapEagerメソッドは適しているといえます。しかし、内部で生成されたFlowable/Observableが完了を通知までに、次以降のFlowable/Observableが生成したデータをバッファすることになるので、大量のデータがバッファされるとMissingBackpressureExceptionが発生するリスクや、メモリが足りなくなるリスクがあります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5