CodeZine(コードジン)

特集ページ一覧

RxJava(2.x)の最初に知っておいてもらいたいオペレータ

RxJavaによるリアクティブプログラミング入門(5)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2017/01/24 14:00

目次

通知するデータを制限するオペレータ

 Flowable/Observableが通知するデータの中には、場合によっては必要ないデータが含まれることがあります。このような不要なデータを受け取り、その受け取ったデータに対して変換などしても、最終的にそのデータを使って処理をしないことになるため無駄に時間とリソースを使ってしまうことになります。ここでは、このような不要なデータを通知せず、必要なデータのみを通知するためのオペレータについて見ていきます。このような通知するデータを制限する代表的なオペレータとして次のものがあります。

  • filter
  • take
  • skip
  • distinctUnitlChanged
  • throttleWithTimeout/debounce

 この他にも通知データの制限を行うさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

filter

マーブルダイアグラム
マーブルダイアグラム
メソッド
  • filter(Predicate<? super T>predicate)

 filterメソッドは受け取ったデータが条件に合うか判定し、結果がtrueのもののみを通知するオペレータです。条件の判定は引数の関数型インターフェースで行います。この引数のPredicateは通知されたデータを受け取り、そのデータを通知する場合はtrueを、通知しない場合はfalseを返すように実装します。この引数のPredicateはRxJavaのものでパッケージはio.reactivex.functionsになります。

predicateの実装例
// 偶数なら通知する
new Predicate<Long>() {
  
  @Override
  public boolean test(Long data) throws Exception {
    return data % 2 == 0;
  }
};

サンプル

 次のサンプルでは、intervalメソッドで生成したFlowableに対し、filterメソッドを使って偶数しか通知しないようにしています。

filter(predicate)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<Long> flowable =
      // Flowableの生成
      Flowable.interval(300L, TimeUnit.MILLISECONDS)  // ①
          // 偶数のみ通知する
          .filter(data -> data % 2 == 0);  // ②
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(3000L);
}
  1. intervalメソッドで300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. filterメソッドで通知されたデータが偶数ならtrueを返し、偶数のデータのみ通知するFlowableを生成。
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 6
RxComputationThreadPool-1: 8

 実行結果より、Predicateで定義したように偶数のデータのみ通知されていることが分かります。

take

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • take(long count)
  • take(long time, TimeUnit unit)

 takeメソッドは引数に指定したデータ数や期間に達するまで、受け取ったデータを通知するオペレータです。指定したデータ数や期間に達したら、完了を通知して処理を終了します。もし、指定した範囲が元の生産者が通知するデータ数より多い場合、元のデータをすべて通知して、完了します。例えば、空のFlowable/Observableに対してtakeメソッドを使ってもエラーにはならず、元の空のFlowable/Observableのように完了のみを通知します。

サンプル

 次のサンプルでは、intervalメソッドで生成したFlowableに対し、takeメソッドを使って3件までしかデータを通知しないようにしています。

take(count)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<Long> flowable =
      // Flowableの生成
      Flowable.interval(1000L, TimeUnit.MILLISECONDS)
          // 3件まで通知する
          .take(3);
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(4000L);
}
  1. intervalメソッドで1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. takeメソッドで最初の3件のデータを通知させ完了させる。
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 完了

 実行結果より最初の3件のデータのみ通知されていることが分かります。

skip

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • skip(long count)
  • skip(long time, TimeUnit unit)

 skipメソッドは最初に通知されるデータから指定した分だけのデータをのぞき、その後からのデータを通知するオペレータです。スキップする範囲はデータ数や経過時間を指定できます。もし、指定した範囲が元のFlowable/Observableが通知するデータ数より多い場合、データを通知することなく完了を通知して処理を終了します。例えば、空のFlowable/Observableに対してskipメソッドを使ってもエラーにはならず、元の空のFlowable/Observableのように完了のみを通知します。

サンプル

 次のサンプルでは、intervalメソッドで生成したFlowableに対し、skipメソッドを使って最初の2件のデータを通知しない(スキップする)ようにしています。

skip(count)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<Long> flowable =
      // Flowableの生成
      Flowable.interval(1000L, TimeUnit.MILLISECONDS)
          // 最初の2件は通知しない
          .skip(2);
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(5000L);
}
  1. intervaleメソッドで1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. skipメソッドで最初の2件を通知しないFlowableを生成。
実行結果
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4

 実行結果より最初の「0」「1」が通知されていないことが分かります。

distinctUnitlChanged

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • distinctUntilChanged()

 distinctUntilChangedメソッドは直前に通知したデータと等しいデータを連続して通知しようとしている場合にそのデータを除外して通知するオペレータです。例えば「A」を通知した後に再び「A」を通知しようとした場合、後者のデータは通知されません。ただし、重複していても連続していない場合は除外しません。

サンプル

 次のサンプルではjustメソッドで生成したFlowableに対し、distinctUntilChangedメソッドを使って、すでに通知したデータと連続して重複するデータを除いて通知しています。

distinctUntilChanged()のサンプル
public static void main(String[] args) {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.just("A", "a", "a", "A", "a")
          // 連続して重複したデータを除いて通知する
          .distinctUntilChanged();
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
}
  1. justメソッドで引数のデータを通知するFlowableを生成。
  2. distinctUntilChangedメソッドで通知しているデータが連続して重複している場合、そのデータを通知しないFlowableを生成。
実行結果
main: A
main: a
main: A
main: a
main: 完了

 実行結果よりデータが連続して重複している場合、そのデータは通知されていないことが分かります。ただし、一度除外されたデータであっても、連続していない場合は通知されることが分かります。

throttleWithTimeout/debounce

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • throttleWithTimeout(long time, TimeUnit unit)
  • debounce(long time, TimeUnit unit)

 throttleWithTimeout/debounceメソッドは元のFlowable/Observableからデータを受け取った後、指定した期間内に別のデータを受け取らなければ、そのデータを通知することを繰り返すオペレータです。期間内に別のデータが来たら、次はそのデータから指定した期間に別のデータが来ないか計測します。ただし、指定した時間内であっても完了やエラーの通知は可能で、完了が通知される場合は最後に通知されるデータとともに完了を通知し、エラーの場合はエラーのみ通知します。

 また、throttleWithTimeoutメソッドもdebounceメソッドも引数が同じメソッドは、名前が違うだけで同じ処理を行います。

サンプル

 次のサンプルでは、createメソッドで生成したFlowableに対し、throttleLastメソッドを使って、データを受け取ってから500ミリ秒間に他のデータを受け取らない場合にそのデータを通知するようにしています。

throttleWithTimeout(time, unit)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.<String> create(  // ①
          // 通知処理
          emitter -> {
            // データを通知し、しばらく待つ
            emitter.onNext("A");
            Thread.sleep(1000L);
            
            emitter.onNext("B");
            Thread.sleep(300L);
            
            emitter.onNext("C");
            Thread.sleep(300L);
            
            emitter.onNext("D");
            Thread.sleep(1000L);
            
            emitter.onNext("E");
            Thread.sleep(100L);
            
            // 完了を通知
            emitter.onComplete();
          }, BackpressureStrategy.BUFFER)
          // 指定した期間に次のデータが来なければ通知する
          .throttleWithTimeout(500L, TimeUnit.MILLISECONDS);  // ②
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
}
  1. createメソッドでデータを通知した後にしばらく待ってから、次のデータを通知するFlowableを生成する。
  2. throttleWithTimeoutメソッドでデータを通知してから500ミリ秒間次のデータを通知しないデータのみ通知するようにする。
実行結果
RxComputationThreadPool-1: A
RxComputationThreadPool-1: D
main: E
main: 完了

 実行結果より「A」を受け取った後の500ミリ秒間に次のデータを受け取らなかったので、「A」が通知できたことが分かります。続くデータ「B」「C」は通知後の500ミリ秒間に次のデータを受け取っているので、これらのデータは通知できません。続く「D」は受け取った後の500ミリ秒間に次のデータを受け取らなかったので「D」を通知することが可能になっています。続く「E」は、500ミリ秒待つ間に完了の通知を受け取っているので、500ミリ秒待つことなく、「E」と完了を通知していることが分かります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5