デバッグ用のオペレータ(2)
doOnCancel/doOnDispose
Flowableのメソッド
-
doOnCancel(Action onCancel)
Observableのメソッド
-
doOnDispose(Action onDispose)
doOnCancelメソッドはFlowableの購読が解除された際に、doOnDisposeメソッドはObservableの購読が解除された際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。ただし、doOnCancel/doOnDisposeメソッドは途中で購読解除されたのではなく、完了やエラーで終了した場合は実行されないので注意が必要です。また、実装する、関数型インターフェースのメソッドの引数には何も渡されません。
「do」の名前で始まるオペレータを使ったサンプル
正常終了時のサンプル
次のサンプルではintervalメソッドで生成したFlowableに対し、takeメソッドを使って3件まで出力するようにしています。そのFlowableに対し、各通知に対する「do」で名前が始まるメソッドを使って通知時のログを出力するようにしています。
また、データを受け取り処理をするSubscriberでは、1件ずつデータを通知するようにリクエストをし、各通知を受ける度にこちらでもログを出力するようにしています。
public static void main(String[] args) throws Exception { Flowable.interval(500L, TimeUnit.MILLISECONDS) // 3件まで .take(3) // データ通知時のログ .doOnNext( data -> System.out.println("doOnNext: data=" + data)) // 完了時のログ .doOnComplete(() -> System.out.println("doOnComplete")) // エラー時のログ .doOnError( error -> System.out.println("doOnError: error=" + error)) // 購読開始時のログ .doOnSubscribe( subscription -> System.out.println("doOnSubscribe")) // データ数のリクエスト時のログ .doOnRequest( size -> System.out.println("doOnRequest: size=" + size)) // 購読解除時のログ .doOnCancel(() -> System.out.println("doOnCancel")) // 購読する .subscribe(new Subscriber<Long>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("購読開始"); this.subscription = subscription; subscription.request(1L); } @Override public void onNext(Long data) { System.out.println("データ=" + data); subscription.request(1L); } @Override public void onError(Throwable error) { System.out.println("エラー=" + error); } @Override public void onComplete() { System.out.println("完了"); } }); // しばらく待つ Thread.sleep(2000L); }
doOnSubscribe 購読開始 doOnRequest: size=1 doOnNext: data=0 データ=0 doOnRequest: size=1 doOnNext: data=1 データ=1 doOnRequest: size=1 doOnNext: data=2 データ=2 doOnRequest: size=1 doOnComplete 完了
実行結果より、まずdoOnSubscribeメソッドが実行された後にSubscriberのonSubscribeメソッドが呼ばれていることがわかります。そして、そこでデータ数のリクエストを行うとdoOnRequestメソッドが実行されています。そして、データを通知する際にdoOnNextメソッドが呼ばれ、その後にSubscriberがデータを受け取り、SubscriberのonNextメソッドを実行していることがわかります。それを繰り返し、全てのデータを通知し終えたら、doOnCompleteメソッドが実行され、SubscriberのonCompleteメソッドが実行されていることがわかります。
また、doOnCancelメソッドが実行されていないことから、完了時にはdoOnCancelメソッドが呼ばれないことがわかります。
購読解除時のサンプル
次のサンプルではintervalメソッドで生成したFlowableに対し、takeメソッドで5件まで出力するようにしています。そのFlowableに対し、各通知に対する「do」で名前が始まるメソッドを使って通知時のログを出力するようにしています。ただし、途中で購読を解除するようにしています。
public static void main(String[] args) throws Exception { Disposable disposable = Flowable.interval(500L, TimeUnit.MILLISECONDS) // 5件まで .take(5) // データ通知時のログ .doOnNext( data -> System.out.println("doOnNext: data=" + data)) // 完了時のログ .doOnComplete(() -> System.out.println("doOnComplete")) // エラー時のログ .doOnError(error -> System.out .println("doOnError: error=" + error)) // 購読開始時のログ .doOnSubscribe( subscription -> System.out.println("doOnSubscribe")) // データ数のリクエスト時のログ .doOnRequest(size -> System.out .println("doOnRequest: size=" + size)) // 購読解除時のログ .doOnCancel(() -> System.out.println("doOnCancel")) // 購読する .subscribeWith(new DebugSubscriber<>()); // しばらく待った後に購読を解除する Thread.sleep(1000L); disposable.dispose(); // しばらく待つ Thread.sleep(2000L); }
doOnSubscribe doOnRequest: size=9223372036854775807 doOnNext: data=0 RxComputationThreadPool-1: 0 doOnNext: data=1 RxComputationThreadPool-1: 1 doOnCancel
実行結果より処理の途中で明示的に購読解除を行うことでdoOnCancelメソッドが実行され、それ以降の通知が行われていないことがわかります。また、今回はSubscriberとしてDisposableSubscriberを実装したDebugSubscriberを使っているため、最初にLong.MAX_VALUEをrequestメソッドで渡しており、最初だけdoOnRequestメソッドが実行されていることがわかります。
まとめ
今回はFlowableやObservableの便利なオペレータについて見ていきました。ここで見たのは代表的なもので、ここで紹介したもの以外にも数多くのオペレータが用意されています。RxJavaには他にもさまざまな機能を用意していますが、取りあえずは今まで見てきたオペレータだけでも多くのことができるようになります。RxJavaを学ぶには、まずはさまざまなオペレータを簡単なサンプルで試してみて、マーブルダイアグラムと比較するのが良いかと思います。