対象読者
- Java経験者(初心者可)
- RxJava未経験者
- リアクティブプログラミング未経験者
※ただし、前回までの連載を読んでいる前提です。
複数のFlowable/Observableを結合するオペレータ(1)
複数のFlowable/Observableを結合するオペレータは大きく分けて2つの種類があり、一つは単に結合してそれぞれのデータを通知するもの、もう一つは各Flowable/Observableが通知したデータを組み合わせて新しいデータを生成してから通知するものです。ここではこれらの代表的なオペレータとして次のものを見ていきます。
- merge
- combineLatest
- zip
この他にも複数のFlowable/Observableを結合するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。
merge
主なメソッド
-
merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2)
-
merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3)
-
merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3, Publisher/ObservableSource<? extends T> source4)
注
「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。
mergeメソッドは複数のFlowable/Observableが通知するデータを1つのFlowable/Observableを通して通知するオペレータです。このメソッドを使うことで、複数のFlowable/Observableの通知を1つのSubscriber/Observerに購読させることができるようになります。処理を開始する際は引数のFlowable/Observableを同時に実行させ、それぞれがデータを通知するタイミングで結果のFlowable/Observableからデータが通知されます。ただし、複数のFlowable/Observableが同時にデータを通知しても、結果として通知する際は同期が取られシーケンシャルに通知されるようになっています。そして、完了を通知するタイミングは引数の全てのFlowable/Observableが完了する時になります。mergeメソッドは引数に最大4つまでのFlowable/Observableを取りますが、それより多くのFlowable/Observableを結合したい場合は、引数を配列で取るmergeArrayメソッドを使うことで対応することができます。
ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、mergeDelayErrorメソッドを使うことで対応することができます。
サンプル
次のサンプルではintervalメソッドで生成した2つのFlowableについて、mergeメソッドを使って1つのFlowableにまとめ、そのFlowableを経由してデータを通知するようにしています。また、引数となるそれぞれのFlowableは通知する間隔やデータ数が異なるようになっています。このmergeメソッドで生成されたFlowableを実行すると引数のFlowableを全て同時に実行され、引数のFlowableを持つデータがmergeメソッドの結果となるFlowableから通知されることになります。
public static void main(String[] args) throws Exception { // マージ対象 Flowable<Long> flowable1 = // (1) // 300ミリ秒ごとにデータを通知する Flowable.interval(300L, TimeUnit.MILLISECONDS) // 5件まで .take(5); // マージ対象 Flowable<Long> flowable2 = // (2) // 500ミリ秒ごとにデータを通知する Flowable.interval(500L, TimeUnit.MILLISECONDS) // 2件まで .take(2) // 100加算する .map(data -> data + 100L); // 複数のFlowableをマージする Flowable<Long> result = Flowable.merge(flowable1, flowable2); // (3) // 購読する result.subscribe(new DebugSubscriber<>()); // しばらく待つ Thread.sleep(2000L); }
- intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
- intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って2件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」を通知するFlowableになる
- mergeメソッドを使って(1)と(2)のFlowableを1つにまとめ、同時に実行するようにする
RxComputationThreadPool-1: 0 RxComputationThreadPool-2: 100 RxComputationThreadPool-1: 1 RxComputationThreadPool-1: 2 RxComputationThreadPool-2: 101 RxComputationThreadPool-1: 3 RxComputationThreadPool-1: 4 RxComputationThreadPool-1: 完了
実行結果より、マージ元のFlowableが同時に処理が実行されていることがわかります。さらに、(2)のFlowableが完了した後も(1)のデータを通知していることより、全てのFlowableが完了を通知するまで全てのデータが通知されることがわかります。
経過時間 | 0 | 300 | 500 | 600 | 900 | 1000 | 1200 | 1500 |
---|---|---|---|---|---|---|---|---|
(1)のFlowable | 0 | 1 | 2 | 3 | 4 | |||
(2)のFlowable | 100 | 101 | ||||||
結果のデータ | 0 | 100 | 1 | 2 | 101 | 3 | 4 |