Shoeisha Technology Media

CodeZine(コードジン)

記事種別から探す

RxJava(2.x)の便利なオペレータ(結合/ユーティリティ/デバッグ)

RxJavaによるリアクティブプログラミング入門(6)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2017/02/23 14:00

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回はRxJava 2.xの最も基本となるオペレータについて解説しました。今回は前回紹介していない複数のFlowable/Observableを結合するオペレータとユーティリティ系のオペレータ、そしてデバッグ用(ログ出力)に使えるオペレータの中でも代表的なものについて見ていきます。今回もサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、データを受け取るSubscriberとして、特別なことをしない限り、第5回で作成したDebugSubscriberを使用しています。

目次

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

複数のFlowable/Observableを結合するオペレータ(1)

 複数のFlowable/Observableを結合するオペレータは大きく分けて2つの種類があり、一つは単に結合してそれぞれのデータを通知するもの、もう一つは各Flowable/Observableが通知したデータを組み合わせて新しいデータを生成してから通知するものです。ここではこれらの代表的なオペレータとして次のものを見ていきます。

  • merge
  • combineLatest
  • zip

 この他にも複数のFlowable/Observableを結合するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

merge

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2)
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3)
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3, Publisher/ObservableSource<? extends T> source4)

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 mergeメソッドは複数のFlowable/Observableが通知するデータを1つのFlowable/Observableを通して通知するオペレータです。このメソッドを使うことで、複数のFlowable/Observableの通知を1つのSubscriber/Observerに購読させることができるようになります。処理を開始する際は引数のFlowable/Observableを同時に実行させ、それぞれがデータを通知するタイミングで結果のFlowable/Observableからデータが通知されます。ただし、複数のFlowable/Observableが同時にデータを通知しても、結果として通知する際は同期が取られシーケンシャルに通知されるようになっています。そして、完了を通知するタイミングは引数の全てのFlowable/Observableが完了する時になります。mergeメソッドは引数に最大4つまでのFlowable/Observableを取りますが、それより多くのFlowable/Observableを結合したい場合は、引数を配列で取るmergeArrayメソッドを使うことで対応することができます。

 ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、mergeDelayErrorメソッドを使うことで対応することができます。

サンプル

 次のサンプルではintervalメソッドで生成した2つのFlowableについて、mergeメソッドを使って1つのFlowableにまとめ、そのFlowableを経由してデータを通知するようにしています。また、引数となるそれぞれのFlowableは通知する間隔やデータ数が異なるようになっています。このmergeメソッドで生成されたFlowableを実行すると引数のFlowableを全て同時に実行され、引数のFlowableを持つデータがmergeメソッドの結果となるFlowableから通知されることになります。

merge(source1, source2)のサンプル
public static void main(String[] args) throws Exception {
  // マージ対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する 
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // マージ対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 2件まで
          .take(2)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableをマージする
  Flowable<Long> result = Flowable.merge(flowable1, flowable2); // (3)
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って2件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」を通知するFlowableになる
  3. mergeメソッドを使って(1)と(2)のFlowableを1つにまとめ、同時に実行するようにする
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-2: 100
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-2: 101
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 完了

 実行結果より、マージ元のFlowableが同時に処理が実行されていることがわかります。さらに、(2)のFlowableが完了した後も(1)のデータを通知していることより、全てのFlowableが完了を通知するまで全てのデータが通知されることがわかります。

経過時間と通知されるデータ
経過時間 0 300 500 600 900 1000 1200 1500
(1)のFlowable   0   1 2   3 4
(2)のFlowable     100     101    
結果のデータ   0 100 1 2 101 3 4

  • LINEで送る
  • このエントリーをはてなブックマークに追加

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門
All contents copyright © 2006-2017 Shoeisha Co., Ltd. All rights reserved. ver.1.5