ユーティリティ系のオペレータ
今まで基本となるFlowable/Observableの生成からデータの選択や変換をするオペレータ、そして先ほどは複数のFlowable/Observableの結合するオペレータを見てきましたが、それ以外の処理も行いたいことがあります。ここではその他の便利に使えるオペレータをユーティリティ系のオペレータとし代表的なものとして次のものを見ていきます。
- repeat
- delay
この他にもFlowable/Observableを操作するさまざまなユーティリティ系オペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。
またこの中にある「do」で始まるオペレータについては、デバッグ用として後述します。
repeat
主なメソッド
-
repeat
-
repeat(long times)
repeatメソッドは元のFlowable/Observableの処理が完了する際にデータの通知を最初から繰り返し行うようにするオペレータです。例えば元のFlowable/Observableが「1」「2」「3」とデータを通知し完了する場合、repeatメソッドを使うことで「1」「2」「3」「1」「2」「3」……と繰り返しデータを通知するようになります。repeatメソッドの引数がないものは完了することなく繰り返しデータを通知し続け、引数があるものは繰り返しの回数を指定できます。例えば、「A」「B」「C」と通知するFlowableに対し、repeat(2)を使うと「A」「B」「C」「A」「B」「C」と通知し、その後に完了を通知します。
また、引数に0未満の数値を渡すと、IllegalArgumentExceptionが発生し、引数に「0」を渡すと何もデータを通知せず完了だけ通知する空のFlowable/Observableになります。
サンプル
次のサンプルではjustメソッド生成したFlowableに対し、repeatメソッドを使って、データの通知が2回行われるようにしています。
public static void main(String[] args) { Flowable<String> flowable = // Flowableの生成 Flowable.just("A", "B", "C") // (1) // 通知を繰り返す .repeat(2); // (2) // 購読する flowable.subscribe(new DebugSubscriber<>()); }
- justメソッドを使って「A」「B」「C」を通知するFlowableを生成
- repeatメソッドを使って、データ通知を2回繰り返すようにする
main: A main: B main: C main: A main: B main: C main: 完了
実行結果より、データ通知が2回行われていることがわかります。
delay
主なメソッド
-
delay(long time, TimeUnit unit)
delayメソッドはFlowable/Observableから受け取ったデータを指定した期間だけ遅らせて通知するオペレータです。遅らせる期間は引数に時間を渡すことで指定できます。また、ここでは紹介しませんが、delayメソッドの中には引数に関数型インターフェースを取るものもあり、その場合は関数型インターフェースから生成されたFlowable/Observableがデータを通知するタイミングで結果のデータ通知を遅らせるように指定することができます。
ちなみに、delayメソッドとよく似たオペレータとしてdelaySubscriptionメソッドがあります。このオペレータはdelayメソッドとは異なり、データの通知を遅らせるのではなく、指定した期間だけ処理の開始を遅らせ、データは生成したらすぐに通知するようになっています。
サンプル
次のサンプルでは、justメソッドで生成したFlowableに対し、delayメソッドを使って、2000ミリ秒遅らせてデータを通知するようにしています。また、処理の開始時と購読およびデータ通知時に、確認のためシステム時間を出力しています。
public static void main(String[] args) throws Exception { // 処理の開始時間の出力 System.out.println("処理開始: " + System.currentTimeMillis()); Flowable<String> flowable = // Flowableの生成 Flowable.<String> create(emitter -> { // (1) // 購読の開始時間の出力 System.out.println("購読開始: " + System.currentTimeMillis()); // データの通知 emitter.onNext("A"); emitter.onNext("B"); emitter.onNext("C"); // 完了の通知 emitter.onComplete(); }, BackpressureStrategy.BUFFER) // 通知を遅らせる .delay(2000L, TimeUnit.MILLISECONDS); // (2) // 購読する flowable.subscribe(data -> { // (3) System.out.println( "通知時間: " + System.currentTimeMillis() + ": " + data); }); // しばらく待つ Thread.sleep(3000L); }
- createメソッドを使って「A」「B」「C」と通知するFlowableを生成。購読開始時にシステム時間を出力する
- delayメソッドで通知するタイミングを2000ミリ秒遅らせる
- 結果としてデータを受け取った際にその時にシステム時間とデータを出力する
処理開始: 1485837554484 購読開始: 1485837555045 通知時間: 1485837557058: A 通知時間: 1485837557058: B 通知時間: 1485837557058: C
実行結果より、「購読開始」(……5045)と「通知時間」(……7058)の時間差が約2000ミリ秒あることにより、delayメソッドによって通知データが指定した時間だけ遅れて通知されていることがわかります。また、「処理開始」と「購読開始」の時間の差は引数の指定した時間ほど遅れていないので、購読自体に影響を与えているわけではないことがわかります。