SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

RxJavaによるリアクティブプログラミング入門

RxJava(2.x)の便利なオペレータ(結合/ユーティリティ/デバッグ)

RxJavaによるリアクティブプログラミング入門(6)

  • X ポスト
  • このエントリーをはてなブックマークに追加

デバッグ用のオペレータ(2)

doOnCancel/doOnDispose

マーブルダイアグラム
マーブルダイアグラム
Flowableのメソッド
  • doOnCancel(Action onCancel)
Observableのメソッド
  • doOnDispose(Action onDispose)

 doOnCancelメソッドはFlowableの購読が解除された際に、doOnDisposeメソッドはObservableの購読が解除された際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。ただし、doOnCancel/doOnDisposeメソッドは途中で購読解除されたのではなく、完了やエラーで終了した場合は実行されないので注意が必要です。また、実装する、関数型インターフェースのメソッドの引数には何も渡されません。

「do」の名前で始まるオペレータを使ったサンプル

正常終了時のサンプル

 次のサンプルではintervalメソッドで生成したFlowableに対し、takeメソッドを使って3件まで出力するようにしています。そのFlowableに対し、各通知に対する「do」で名前が始まるメソッドを使って通知時のログを出力するようにしています。

 また、データを受け取り処理をするSubscriberでは、1件ずつデータを通知するようにリクエストをし、各通知を受ける度にこちらでもログを出力するようにしています。

サンプル
public static void main(String[] args) throws Exception {
  Flowable.interval(500L, TimeUnit.MILLISECONDS)
      // 3件まで
      .take(3)
      // データ通知時のログ
      .doOnNext(
          data -> System.out.println("doOnNext: data=" + data))
      // 完了時のログ
      .doOnComplete(() -> System.out.println("doOnComplete"))
      // エラー時のログ
      .doOnError(
          error -> System.out.println("doOnError: error=" + error))
      // 購読開始時のログ
      .doOnSubscribe(
          subscription -> System.out.println("doOnSubscribe"))
      // データ数のリクエスト時のログ
      .doOnRequest(
          size -> System.out.println("doOnRequest: size=" + size))
      // 購読解除時のログ
      .doOnCancel(() -> System.out.println("doOnCancel"))
      // 購読する
      .subscribe(new Subscriber<Long>() {
        
        private Subscription subscription;
        
        @Override
        public void onSubscribe(Subscription subscription) {
          System.out.println("購読開始");
          this.subscription = subscription;
          subscription.request(1L);
        }
        
        @Override
        public void onNext(Long data) {
          System.out.println("データ=" + data);
          subscription.request(1L);
        }
        
        @Override
        public void onError(Throwable error) {
          System.out.println("エラー=" + error);
        }
        
        @Override
        public void onComplete() {
          System.out.println("完了");
        }
      });
  
  // しばらく待つ
  Thread.sleep(2000L);
}
実行結果
doOnSubscribe
購読開始
doOnRequest: size=1
doOnNext: data=0
データ=0
doOnRequest: size=1
doOnNext: data=1
データ=1
doOnRequest: size=1
doOnNext: data=2
データ=2
doOnRequest: size=1
doOnComplete
完了

 実行結果より、まずdoOnSubscribeメソッドが実行された後にSubscriberのonSubscribeメソッドが呼ばれていることがわかります。そして、そこでデータ数のリクエストを行うとdoOnRequestメソッドが実行されています。そして、データを通知する際にdoOnNextメソッドが呼ばれ、その後にSubscriberがデータを受け取り、SubscriberのonNextメソッドを実行していることがわかります。それを繰り返し、全てのデータを通知し終えたら、doOnCompleteメソッドが実行され、SubscriberのonCompleteメソッドが実行されていることがわかります。

 また、doOnCancelメソッドが実行されていないことから、完了時にはdoOnCancelメソッドが呼ばれないことがわかります。

購読解除時のサンプル

 次のサンプルではintervalメソッドで生成したFlowableに対し、takeメソッドで5件まで出力するようにしています。そのFlowableに対し、各通知に対する「do」で名前が始まるメソッドを使って通知時のログを出力するようにしています。ただし、途中で購読を解除するようにしています。

サンプル
public static void main(String[] args) throws Exception {
  Disposable disposable =
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5)
          // データ通知時のログ
          .doOnNext(
              data -> System.out.println("doOnNext: data=" + data))
          // 完了時のログ
          .doOnComplete(() -> System.out.println("doOnComplete"))
          // エラー時のログ
          .doOnError(error -> System.out
              .println("doOnError: error=" + error))
          // 購読開始時のログ
          .doOnSubscribe(
              subscription -> System.out.println("doOnSubscribe"))
          // データ数のリクエスト時のログ
          .doOnRequest(size -> System.out
              .println("doOnRequest: size=" + size))
          // 購読解除時のログ
          .doOnCancel(() -> System.out.println("doOnCancel"))
          // 購読する
          .subscribeWith(new DebugSubscriber<>());
  
  // しばらく待った後に購読を解除する
  Thread.sleep(1000L);
  disposable.dispose();
  
  // しばらく待つ
  Thread.sleep(2000L);
}
実行結果
doOnSubscribe
doOnRequest: size=9223372036854775807
doOnNext: data=0
RxComputationThreadPool-1: 0
doOnNext: data=1
RxComputationThreadPool-1: 1
doOnCancel

 実行結果より処理の途中で明示的に購読解除を行うことでdoOnCancelメソッドが実行され、それ以降の通知が行われていないことがわかります。また、今回はSubscriberとしてDisposableSubscriberを実装したDebugSubscriberを使っているため、最初にLong.MAX_VALUEをrequestメソッドで渡しており、最初だけdoOnRequestメソッドが実行されていることがわかります。

まとめ

 今回はFlowableやObservableの便利なオペレータについて見ていきました。ここで見たのは代表的なもので、ここで紹介したもの以外にも数多くのオペレータが用意されています。RxJavaには他にもさまざまな機能を用意していますが、取りあえずは今まで見てきたオペレータだけでも多くのことができるようになります。RxJavaを学ぶには、まずはさまざまなオペレータを簡単なサンプルで試してみて、マーブルダイアグラムと比較するのが良いかと思います。

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
RxJavaによるリアクティブプログラミング入門連載記事一覧

もっと読む

この記事の著者

須田 智之(スダ トモユキ)

十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9974 2017/02/23 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング