SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

RxJavaによるリアクティブプログラミング入門

RxJava(2.x)の便利なオペレータ(結合/ユーティリティ/デバッグ)

RxJavaによるリアクティブプログラミング入門(6)

  • X ポスト
  • このエントリーをはてなブックマークに追加

複数のFlowable/Observableを結合するオペレータ(2)

combineLatest

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • combineLatest(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> combiner)
  • combineLatest(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, Publisher/ObservableSource<? extends T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> combiner)

と増えていき、引数となるFlowable/Observableを最大9つまで取るものが用意されています。

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 combineLatestメソッドは、複数のFlowable/Observableが通知するそれぞれのデータを引数の関数型インターフェースに渡すことで新たなデータを生成して通知するオペレータです。通知のタイミングは最初のデータのみ各Flowable/Observableがデータを通知するのを待って、それ以降は各Flowable/Observableがデータを通知するたびに、それまでに通知している最新のデータを使って新しいデータを生成していきます。完了を通知するタイミングは、全ての引数のFlowable/Observableが完了を通知したタイミングで完了するようになります。

 また、引数には結合するFlowable/Observableの他に、それぞれの受け取ったデータをどのように変換し新しいデータを生成するのかを定義する関数型インターフェースを持っています。この関数型インターフェースは受け取るFlowable/Observableの数によって、BiFunction、Function3、Function4、……と変わっていきます。

combinerの実装例(引数のFlowable/Observableが2つの場合)
// 引数のFlowable/Observableが通知したそれぞれのデータを格納したListを生成する
new BiFunction<Long, Long, List<Long>>() {
  
  @Override
  public List<Long> apply(Long data1, Long data2) throws Exception {
    return Arrays.asList(data1, data2);
  }
};

 ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、combineLatestDelayErrorメソッドを使うことで対応することができます。

サンプル

 次のサンプルでは、intervalメソッドで生成した2つのFlowableを引数に取るcombineLatestメソッドを使って、元となるFlowableがデータを通知するたびに、その時点で最後に通知したデータを使って、新しいデータを生成し通知しています。今回はそれぞれのFlowableから受け取ったデータを格納したListを通知するようにしています。

combineLatest(source1, source2, combiner)のサンプル
public static void main(String[] args) throws Exception {
  // 結合対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // 結合対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 3件まで
          .take(3)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableから受け取ったデータで新しいデータを生成する
  Flowable<List<Long>> result = Flowable.combineLatest(  // (3)
      // 結合するFlowable
      flowable1,
      // 結合するFlowable
      flowable2,
      // 引数より通知されたデータをListに格納して通知
      (data1, data2) -> Arrays.asList(data1, data2));
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って3件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」「102」を通知するFlowableになる
  3. combineLatestメソッドを使って(1)と(2)のFlowableが通知したデータを受け取り、それらのデータを格納したListを、結果として通知する新しいデータとして生成する
実行結果
RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-1: [1, 100]
RxComputationThreadPool-1: [2, 100]
RxComputationThreadPool-2: [2, 101]
RxComputationThreadPool-1: [3, 101]
RxComputationThreadPool-1: [4, 101]
RxComputationThreadPool-2: [4, 102]
RxComputationThreadPool-2: 完了

 実行結果より、最初のデータのみ両方のFlowableがデータを通知するまで待ち、それ以降は各Flowableがデータを通知するたびに、その時点での引数が通知する最新のデータを使って、結果として通知するデータを生成していることがわかります。また、完了のタイミングは各Flowableのすべてのデータを通知していることより、最後にデータを通知したFlowableが完了したタイミングで実行されていることがわかります。

zip

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • zip(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> zipper)
  • zip(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, Publisher/ObservableSource<? extends T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> zipper)

と増えていき、引数となるFlowable/Observableを最大9つまで取るものが用意されています。

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 zipメソッドは複数のFlowable/Observableが通知するそれぞれのデータを引数の関数型インターフェースに渡すことで新たなデータを生成して通知するオペレータです。通知のタイミングは各Flowable/Observableのデータがそろうのを待ってから新しいデータを生成するため、通知ペースが遅いFlowable/Observableに合わせることになります。完了を通知するタイミングは、データ数が最も少ないFlowable/Observableが完了したタイミングになります。

 また、引数には、結合するFlowable/Observableの他に、それぞれの受け取ったデータをどのように変換し新しいデータを生成するのかを定義する関数型インターフェースを持っています。この関数型インターフェースは受け取るFlowable/Observableの数によって、BiFunction、Function3、Function4、……と変わっていきます。

zipperの実装例(引数のFlowable/Observableが2つの場合)
// 引数のFlowable/Observableが通知したそれぞれのデータを格納したListを生成する
new BiFunction<Long, Long, List<Long>>() {
  
  @Override
  public List<Long> apply(Long data1, Long data2) throws Exception {
    return Arrays.asList(data1, data2);
  }
};

サンプル

 次のサンプルでは、intervalメソッドで生成した2つのFlowableを引数に取るzipメソッドを使って、元となる各Flowableのデータがそろったタイミングで新しいデータを生成し通知しています。今回はそれぞれのFlowableから受け取ったデータを格納したListを通知するようにしています。

zip(source1, source2, zipper)のサンプル
public static void main(String[] args) throws Exception {
  // 結合対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // 結合対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 3件まで
          .take(3)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableから受け取ったデータで新しいデータを生成する
  Flowable<List<Long>> result = Flowable.zip(  // (3)
      // 結合するFlowable
      flowable1,
      // 結合するFlowable
      flowable2,
      // 引数より通知されたデータをListに格納して通知
      (data1, data2) -> Arrays.asList(data1, data2));
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って3件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」「102」を通知するFlowableになる
  3. zipメソッドを使って(1)と(2)のFlowableが通知したデータを受け取り、それらのデータを格納したListを、結果として通知する新しいデータとして生成する
実行結果
RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-2: [1, 101]
RxComputationThreadPool-2: [2, 102]
RxComputationThreadPool-2: 完了

 実行結果より、引数全てのFlowableの通知データがそろったタイミングで新しいデータを生成し、そのデータを通知していることがわかります。また、完了のタイミングは3件しか生成されていないことから、最もデータ数が少ないFlowableが完了したタイミングで実行されていることがわかります。逆に言うと、データ数が多いFlowableが持つ超過したデータは通知されることはありません。

次のページ
ユーティリティ系のオペレータ

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
RxJavaによるリアクティブプログラミング入門連載記事一覧

もっと読む

この記事の著者

須田 智之(スダ トモユキ)

十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9974 2017/02/23 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング