通知するデータを制限するオペレータ
Flowable/Observableが通知するデータの中には、場合によっては必要ないデータが含まれることがあります。このような不要なデータを受け取り、その受け取ったデータに対して変換などしても、最終的にそのデータを使って処理をしないことになるため無駄に時間とリソースを使ってしまうことになります。ここでは、このような不要なデータを通知せず、必要なデータのみを通知するためのオペレータについて見ていきます。このような通知するデータを制限する代表的なオペレータとして次のものがあります。
- filter
- take
- skip
- distinctUnitlChanged
- throttleWithTimeout/debounce
この他にも通知データの制限を行うさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。
filter
メソッド
-
filter(Predicate<? super T>predicate)
filterメソッドは受け取ったデータが条件に合うか判定し、結果がtrueのもののみを通知するオペレータです。条件の判定は引数の関数型インターフェースで行います。この引数のPredicateは通知されたデータを受け取り、そのデータを通知する場合はtrueを、通知しない場合はfalseを返すように実装します。この引数のPredicateはRxJavaのものでパッケージはio.reactivex.functions
になります。
// 偶数なら通知する new Predicate<Long>() { @Override public boolean test(Long data) throws Exception { return data % 2 == 0; } };
サンプル
次のサンプルでは、intervalメソッドで生成したFlowableに対し、filterメソッドを使って偶数しか通知しないようにしています。
public static void main(String[] args) throws Exception { Flowable<Long> flowable = // Flowableの生成 Flowable.interval(300L, TimeUnit.MILLISECONDS) // ① // 偶数のみ通知する .filter(data -> data % 2 == 0); // ② // 購読する flowable.subscribe(new DebugSubscriber<>()); // しばらく待つ Thread.sleep(3000L); }
- intervalメソッドで300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
- filterメソッドで通知されたデータが偶数ならtrueを返し、偶数のデータのみ通知するFlowableを生成。
RxComputationThreadPool-1: 0 RxComputationThreadPool-1: 2 RxComputationThreadPool-1: 4 RxComputationThreadPool-1: 6 RxComputationThreadPool-1: 8
実行結果より、Predicateで定義したように偶数のデータのみ通知されていることが分かります。
take
主なメソッド
-
take(long count)
-
take(long time, TimeUnit unit)
takeメソッドは引数に指定したデータ数や期間に達するまで、受け取ったデータを通知するオペレータです。指定したデータ数や期間に達したら、完了を通知して処理を終了します。もし、指定した範囲が元の生産者が通知するデータ数より多い場合、元のデータをすべて通知して、完了します。例えば、空のFlowable/Observableに対してtakeメソッドを使ってもエラーにはならず、元の空のFlowable/Observableのように完了のみを通知します。
サンプル
次のサンプルでは、intervalメソッドで生成したFlowableに対し、takeメソッドを使って3件までしかデータを通知しないようにしています。
public static void main(String[] args) throws Exception { Flowable<Long> flowable = // Flowableの生成 Flowable.interval(1000L, TimeUnit.MILLISECONDS) // 3件まで通知する .take(3); // 購読する flowable.subscribe(new DebugSubscriber<>()); // しばらく待つ Thread.sleep(4000L); }
- intervalメソッドで1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
- takeメソッドで最初の3件のデータを通知させ完了させる。
RxComputationThreadPool-1: 0 RxComputationThreadPool-1: 1 RxComputationThreadPool-1: 2 RxComputationThreadPool-1: 完了
実行結果より最初の3件のデータのみ通知されていることが分かります。
skip
主なメソッド
-
skip(long count)
-
skip(long time, TimeUnit unit)
skipメソッドは最初に通知されるデータから指定した分だけのデータをのぞき、その後からのデータを通知するオペレータです。スキップする範囲はデータ数や経過時間を指定できます。もし、指定した範囲が元のFlowable/Observableが通知するデータ数より多い場合、データを通知することなく完了を通知して処理を終了します。例えば、空のFlowable/Observableに対してskipメソッドを使ってもエラーにはならず、元の空のFlowable/Observableのように完了のみを通知します。
サンプル
次のサンプルでは、intervalメソッドで生成したFlowableに対し、skipメソッドを使って最初の2件のデータを通知しない(スキップする)ようにしています。
public static void main(String[] args) throws Exception { Flowable<Long> flowable = // Flowableの生成 Flowable.interval(1000L, TimeUnit.MILLISECONDS) // 最初の2件は通知しない .skip(2); // 購読する flowable.subscribe(new DebugSubscriber<>()); // しばらく待つ Thread.sleep(5000L); }
- intervaleメソッドで1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
- skipメソッドで最初の2件を通知しないFlowableを生成。
RxComputationThreadPool-1: 2 RxComputationThreadPool-1: 3 RxComputationThreadPool-1: 4
実行結果より最初の「0」「1」が通知されていないことが分かります。
distinctUnitlChanged
主なメソッド
-
distinctUntilChanged()
distinctUntilChangedメソッドは直前に通知したデータと等しいデータを連続して通知しようとしている場合にそのデータを除外して通知するオペレータです。例えば「A」を通知した後に再び「A」を通知しようとした場合、後者のデータは通知されません。ただし、重複していても連続していない場合は除外しません。
サンプル
次のサンプルではjustメソッドで生成したFlowableに対し、distinctUntilChangedメソッドを使って、すでに通知したデータと連続して重複するデータを除いて通知しています。
public static void main(String[] args) { Flowable<String> flowable = // Flowableの生成 Flowable.just("A", "a", "a", "A", "a") // 連続して重複したデータを除いて通知する .distinctUntilChanged(); // 購読する flowable.subscribe(new DebugSubscriber<>()); }
- justメソッドで引数のデータを通知するFlowableを生成。
- distinctUntilChangedメソッドで通知しているデータが連続して重複している場合、そのデータを通知しないFlowableを生成。
main: A main: a main: A main: a main: 完了
実行結果よりデータが連続して重複している場合、そのデータは通知されていないことが分かります。ただし、一度除外されたデータであっても、連続していない場合は通知されることが分かります。
throttleWithTimeout/debounce
主なメソッド
-
throttleWithTimeout(long time, TimeUnit unit)
-
debounce(long time, TimeUnit unit)
throttleWithTimeout/debounceメソッドは元のFlowable/Observableからデータを受け取った後、指定した期間内に別のデータを受け取らなければ、そのデータを通知することを繰り返すオペレータです。期間内に別のデータが来たら、次はそのデータから指定した期間に別のデータが来ないか計測します。ただし、指定した時間内であっても完了やエラーの通知は可能で、完了が通知される場合は最後に通知されるデータとともに完了を通知し、エラーの場合はエラーのみ通知します。
また、throttleWithTimeoutメソッドもdebounceメソッドも引数が同じメソッドは、名前が違うだけで同じ処理を行います。
サンプル
次のサンプルでは、createメソッドで生成したFlowableに対し、throttleLastメソッドを使って、データを受け取ってから500ミリ秒間に他のデータを受け取らない場合にそのデータを通知するようにしています。
public static void main(String[] args) throws Exception { Flowable<String> flowable = // Flowableの生成 Flowable.<String> create( // ① // 通知処理 emitter -> { // データを通知し、しばらく待つ emitter.onNext("A"); Thread.sleep(1000L); emitter.onNext("B"); Thread.sleep(300L); emitter.onNext("C"); Thread.sleep(300L); emitter.onNext("D"); Thread.sleep(1000L); emitter.onNext("E"); Thread.sleep(100L); // 完了を通知 emitter.onComplete(); }, BackpressureStrategy.BUFFER) // 指定した期間に次のデータが来なければ通知する .throttleWithTimeout(500L, TimeUnit.MILLISECONDS); // ② // 購読する flowable.subscribe(new DebugSubscriber<>()); }
- createメソッドでデータを通知した後にしばらく待ってから、次のデータを通知するFlowableを生成する。
- throttleWithTimeoutメソッドでデータを通知してから500ミリ秒間次のデータを通知しないデータのみ通知するようにする。
RxComputationThreadPool-1: A RxComputationThreadPool-1: D main: E main: 完了
実行結果より「A」を受け取った後の500ミリ秒間に次のデータを受け取らなかったので、「A」が通知できたことが分かります。続くデータ「B」「C」は通知後の500ミリ秒間に次のデータを受け取っているので、これらのデータは通知できません。続く「D」は受け取った後の500ミリ秒間に次のデータを受け取らなかったので「D」を通知することが可能になっています。続く「E」は、500ミリ秒待つ間に完了の通知を受け取っているので、500ミリ秒待つことなく、「E」と完了を通知していることが分かります。