CodeZine(コードジン)

特集ページ一覧

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/11/09 14:00

目次

observeOnメソッド

 RxJavaでは、データを通知する側と受け取る側とで処理を別々のスレッドで実行させる場合、observerOnメソッドの引数にSchedulerというスレッド管理を行うオブジェクトを設定することで、どのようなスレッド上で処理を行うのかを指定します。また、observeOnメソッドにはSchedulerに加えて他の引数を取るものがあり、元になるのは次のobserveOnメソッドになります。

observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
observeOnメソッドの引数
引数No. 引数の型 説明
第1引数 Scheduler スレッド管理を行うクラス。
第2引数 boolean trueの場合はエラーが発生しても、そのことをすぐには通知せず、バッファしているデータを通知し終えてからエラーを通知する。falseの場合は発生したらすぐにエラーを通知する。デフォルトはfalse。
第3引数 int 通知待ちのデータをバッファするサイズ。デフォルトでは128。

 observeOnメソッドには、この第2引数と第3引数を省略してあるメソッドも用意してあり、その場合は各デフォルト値が設定された挙動になります。

 通知待ちのデータを扱う際に特に重要になるのが第3引数で、消費者に通知されるデータは、このバッファされた通知待ちのデータから取得されることになります。実際には、ここで指定した数値がFlowableに対してデータ数のリクエストを自動で行うようになっており、そのリクエストを受けて送られたデータがバッファされることになります。つまり「1」を指定すると、内部でrequest(1)が実行されていることになります。

 例えば、次のサンプルでは100ミリ秒ごとに0から始まる数値を通知するFlowableに対し、別スレッド上でデータを受け取るSubscriberが処理を300ミリ秒待ってから受け取ったデータを出力するようにしています。そのため、Flowableの処理がSubscriberの処理より早いので、通知待ちのデータが発生することになります。そこで、このサンプルでは通知待ちのデータを破棄するように設定しています。

public static void main(String[] args) throws Exception {
  
  Flowable<Long> flowable =
      // 100ミリ秒ごとに0から始まるデータを通知するFlowableを生成……(1)
      Flowable.interval(100L, TimeUnit.MILLISECONDS)
          // BackpressureMode.DROPを設定した時と同じ挙動にする……(2)
          .onBackpressureDrop();
  
  flowable
      // 非同期でデータを受け取るようにし、バッファサイズを1にする……(3)
      .observeOn(Schedulers.computation(), false, 1)
      // 購読する
      .subscribe(new DisposableSubscriber<Long>() {
        
        // データを受け取った際の処理
        @Override
        public void onNext(Long item) {
          // 300ミリ秒待つ……(4)
          try {
            Thread.sleep(300L);
          } catch (InterruptedException e) {
            e.printStackTrace();
            // 異常終了で終わる
            System.exit(1);
          }
          
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + item);
        }
        
        …… 略
      });
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドは100ミリ秒ごとに0から始まるデータを通知するFlowableを生成するメソッド。指定した時間ごとのに「0」「1」「2」……と順に数値を通知するFlowableが生成される。
  2. バッファサイズを超えて生成されたデータを破棄するようなバックプレッシャーのモードを設定する。
  3. 消費者側の処理を生産者側とは異なるスレッドで実行するように設定し、バッファするサイズを「1」にする。
  4. 重い処理をしていると見なし、300ミリ秒ほどスレッドを止める。※本来はThreadのsleepメソッドをSubscriber内で使うべきではない。
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 12
RxComputationThreadPool-1: 16

 ※非同期処理のため実行するたびに結果は変わる可能性があります。

 このように通知されるまでに生成されたデータは破棄されていることが分かります。

 ところが次のように、このバッファサイズを「2」に変更すると、

flowable
  .observeOn(Schedulers.computation(), false, 2)

 次の結果になります。

実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 7
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 14

 ※非同期処理のため実行するたびに結果は変わる可能性があります。

 この結果より、リクエストするまでに時間があいているにも関わらず、「0」「1」と続けてデータが通知されていることから、リクエストした後に通知されるデータはキャッシュされたデータであり、キャッシュサイズを超えて通知待ちになっているデータが破棄されていることが分かります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5