複数のFlowable/Observableを結合するオペレータ(2)
combineLatest
主なメソッド
-
combineLatest(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> combiner)
-
combineLatest(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, Publisher/ObservableSource<? extends T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> combiner)
と増えていき、引数となるFlowable/Observableを最大9つまで取るものが用意されています。
注
「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。
combineLatestメソッドは、複数のFlowable/Observableが通知するそれぞれのデータを引数の関数型インターフェースに渡すことで新たなデータを生成して通知するオペレータです。通知のタイミングは最初のデータのみ各Flowable/Observableがデータを通知するのを待って、それ以降は各Flowable/Observableがデータを通知するたびに、それまでに通知している最新のデータを使って新しいデータを生成していきます。完了を通知するタイミングは、全ての引数のFlowable/Observableが完了を通知したタイミングで完了するようになります。
また、引数には結合するFlowable/Observableの他に、それぞれの受け取ったデータをどのように変換し新しいデータを生成するのかを定義する関数型インターフェースを持っています。この関数型インターフェースは受け取るFlowable/Observableの数によって、BiFunction、Function3、Function4、……と変わっていきます。
// 引数のFlowable/Observableが通知したそれぞれのデータを格納したListを生成する new BiFunction<Long, Long, List<Long>>() { @Override public List<Long> apply(Long data1, Long data2) throws Exception { return Arrays.asList(data1, data2); } };
ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、combineLatestDelayErrorメソッドを使うことで対応することができます。
サンプル
次のサンプルでは、intervalメソッドで生成した2つのFlowableを引数に取るcombineLatestメソッドを使って、元となるFlowableがデータを通知するたびに、その時点で最後に通知したデータを使って、新しいデータを生成し通知しています。今回はそれぞれのFlowableから受け取ったデータを格納したListを通知するようにしています。
public static void main(String[] args) throws Exception { // 結合対象 Flowable<Long> flowable1 = // (1) // 300ミリ秒ごとにデータを通知する Flowable.interval(300L, TimeUnit.MILLISECONDS) // 5件まで .take(5); // 結合対象 Flowable<Long> flowable2 = // (2) // 500ミリ秒ごとにデータを通知する Flowable.interval(500L, TimeUnit.MILLISECONDS) // 3件まで .take(3) // 100加算する .map(data -> data + 100L); // 複数のFlowableから受け取ったデータで新しいデータを生成する Flowable<List<Long>> result = Flowable.combineLatest( // (3) // 結合するFlowable flowable1, // 結合するFlowable flowable2, // 引数より通知されたデータをListに格納して通知 (data1, data2) -> Arrays.asList(data1, data2)); // 購読する result.subscribe(new DebugSubscriber<>()); // しばらく待つ Thread.sleep(2000L); }
- intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
- intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って3件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」「102」を通知するFlowableになる
- combineLatestメソッドを使って(1)と(2)のFlowableが通知したデータを受け取り、それらのデータを格納したListを、結果として通知する新しいデータとして生成する
RxComputationThreadPool-2: [0, 100] RxComputationThreadPool-1: [1, 100] RxComputationThreadPool-1: [2, 100] RxComputationThreadPool-2: [2, 101] RxComputationThreadPool-1: [3, 101] RxComputationThreadPool-1: [4, 101] RxComputationThreadPool-2: [4, 102] RxComputationThreadPool-2: 完了
実行結果より、最初のデータのみ両方のFlowableがデータを通知するまで待ち、それ以降は各Flowableがデータを通知するたびに、その時点での引数が通知する最新のデータを使って、結果として通知するデータを生成していることがわかります。また、完了のタイミングは各Flowableのすべてのデータを通知していることより、最後にデータを通知したFlowableが完了したタイミングで実行されていることがわかります。
zip
主なメソッド
-
zip(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> zipper)
-
zip(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, Publisher/ObservableSource<? extends T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> zipper)
と増えていき、引数となるFlowable/Observableを最大9つまで取るものが用意されています。
注
「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。
zipメソッドは複数のFlowable/Observableが通知するそれぞれのデータを引数の関数型インターフェースに渡すことで新たなデータを生成して通知するオペレータです。通知のタイミングは各Flowable/Observableのデータがそろうのを待ってから新しいデータを生成するため、通知ペースが遅いFlowable/Observableに合わせることになります。完了を通知するタイミングは、データ数が最も少ないFlowable/Observableが完了したタイミングになります。
また、引数には、結合するFlowable/Observableの他に、それぞれの受け取ったデータをどのように変換し新しいデータを生成するのかを定義する関数型インターフェースを持っています。この関数型インターフェースは受け取るFlowable/Observableの数によって、BiFunction、Function3、Function4、……と変わっていきます。
// 引数のFlowable/Observableが通知したそれぞれのデータを格納したListを生成する new BiFunction<Long, Long, List<Long>>() { @Override public List<Long> apply(Long data1, Long data2) throws Exception { return Arrays.asList(data1, data2); } };
サンプル
次のサンプルでは、intervalメソッドで生成した2つのFlowableを引数に取るzipメソッドを使って、元となる各Flowableのデータがそろったタイミングで新しいデータを生成し通知しています。今回はそれぞれのFlowableから受け取ったデータを格納したListを通知するようにしています。
public static void main(String[] args) throws Exception { // 結合対象 Flowable<Long> flowable1 = // (1) // 300ミリ秒ごとにデータを通知する Flowable.interval(300L, TimeUnit.MILLISECONDS) // 5件まで .take(5); // 結合対象 Flowable<Long> flowable2 = // (2) // 500ミリ秒ごとにデータを通知する Flowable.interval(500L, TimeUnit.MILLISECONDS) // 3件まで .take(3) // 100加算する .map(data -> data + 100L); // 複数のFlowableから受け取ったデータで新しいデータを生成する Flowable<List<Long>> result = Flowable.zip( // (3) // 結合するFlowable flowable1, // 結合するFlowable flowable2, // 引数より通知されたデータをListに格納して通知 (data1, data2) -> Arrays.asList(data1, data2)); // 購読する result.subscribe(new DebugSubscriber<>()); // しばらく待つ Thread.sleep(2000L); }
- intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
- intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って3件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」「102」を通知するFlowableになる
- zipメソッドを使って(1)と(2)のFlowableが通知したデータを受け取り、それらのデータを格納したListを、結果として通知する新しいデータとして生成する
RxComputationThreadPool-2: [0, 100] RxComputationThreadPool-2: [1, 101] RxComputationThreadPool-2: [2, 102] RxComputationThreadPool-2: 完了
実行結果より、引数全てのFlowableの通知データがそろったタイミングで新しいデータを生成し、そのデータを通知していることがわかります。また、完了のタイミングは3件しか生成されていないことから、最もデータ数が少ないFlowableが完了したタイミングで実行されていることがわかります。逆に言うと、データ数が多いFlowableが持つ超過したデータは通知されることはありません。