データを変換して通知するオペレータ
Flowable/Observableが通知するデータをそのまま通知するのではなく、そのデータから新たなデータを生成して通知したいことがよくあります。例えば、商品のIDを通知するFlowable/ObservableからIDを受け取り、そのIDに該当する商品のオブジェクトを取得してデータとして通知したい場合など、何らかの変換を行いたい場合です。SubscriberやObserverではなく、オペレータで処理を行うことで、Subscriber/Observerでは最終的なデータを受け取った際の処理のみ行い、オペレータではデータの加工をするように役割を分担できるメリットもあります。ここでは、通知するデータから新たなデータを生成し、そのデータを通知するための代表的なオペレータについて見ていきます。
- map
- flatMap
この他にも通知データを変換するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。
map
主なメソッド
-
map(Function<? super T,? extends R> mapper)
mapメソッドは元のFlowable/Observableから通知されたデータを変換し、その変換したデータを通知するオペレータです。どのように変換するかは引数の関数型インターフェースで定義します。ただし、受け取ったデータに対し必ず1件のデータを通知しなければならず、変換した結果がnullになるようなデータや複数になるデータには変換できません。
また、引数のmapperは元のデータをどのように変換するのかを定義する関数型インターフェースです。この引数のFunctionはRxJavaのものでパッケージはio.reactivex.functions
になります。変換したデータの型は受け取ったデータの型と同じでも違っていてもどちらでも構いません。
// StringのデータをBigDecimalに変換する new Function<String, BigDecimal>() { @Override public BigDecimal apply(String data) throws Exception { return new BigDecimal(data); } }
サンプル
次のサンプルでは元の大文字のデータをmapメソッドを通して小文字のデータに変換して通知するようにしています。
public static void main(String[] args) throws Exception { Flowable<String> flowable = // 引数のデータを順に通知するFlowableの生成 Flowable.just("A", "B", "C", "D", "E") // ① // mapメソッドを使って小文字に変換 .map(data -> data.toLowerCase()); // ② // 購読開始 flowable.subscribe(new DebugSubscriber<>()); }
- "A", "B", "C", "D", "E"を順に通知するFlowableを生成。
-
mapメソッドで元のデータを
toLowerCase()
で小文字に変換し、それを通知するFlowableを生成。
main: a main: b main: c main: d main: e main: 完了
実行結果より大文字のデータを小文字のデータに変換して通知していることが分かります。今回のサンプルでは受け取ったデータと同じ型を返していますが、異なる型を返すことも可能です。
flatMap
Flowable/Observableの主なメソッド
-
flatMap(Function<? super T,? extends Publisher/ObservableSource<? extends R>> mapper)
注
「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。
flatMapメソッドはmapメソッドのように元のデータを変換し、その変換したデータを通知するオペレータです。しかし、flatMapメソッドではmapメソッドと異なり、受け取ったデータから複数のデータを持つFlowable/Observableを返すことで、1つのデータから複数のデータを通知することが可能になります。さらに、空のFlowable/Observableを返すことで特定のデータの通知を止めたり、エラーのFlowable/Observableを返すことでエラーを通知したりすることが可能になります。
このflatMapメソッドの関数型インターフェースでFlowable/Observableを返すということはメソッドチェインにおいて重要になるポイントです。例えば、変換の結果がnullのデータや条件に合わないデータを受け取った場合、空のFlowable/Observableを返すことで、後続の処理にはそのデータを通知しなくしたり、処理中にエラーが発生する場合、エラーのFlowable/Observableを返すことでエラーの通知をしたりできます。
また、引数のmapperは通知されるデータを受け取り、それに対し何らかの変換を行い、その変換をしたデータを通知するFlowable/Observableを返す関数型インターフェースです。最終的にmapperの戻り値となるFlowable/Observableが持つデータがそれぞれ通知されます。そのため関数型インターフェースの戻り値が複数のデータを含むFlowable/Observableを返す場合、1つのデータから複数のデータを通知できるようになります。また、空のFlowable/Observableを返す場合は、そのデータを通知させないようにできます。さらに、エラーを返すFlowable/Observableを戻り値とした場合、エラーが通知され次のデータの処理が行われなくなります。
// 受け取った各データを2度通知するようにする
new Function<Integer, Flowable<? extends Integer>>() {
@Override
public Flowable<? extends Integer> apply(Integer data)
throws Exception {
return Flowable.just(data, data);
}
}
また、この戻り値が時間を扱うようなFlowable/Observableにした場合、そのFlowable/ObservableはflatMapメソッドが実行しているスレッドとは異なるスレッド上で処理を行うので注意が必要です。関数型インターフェースで受け取ったデータからFlowable/Observableを生成する処理自体はシーケンシャルに行われますが、その生成されたFlowable/Observableが行う処理は非同期で行われます。つまり、戻り値となるFlowable/Observableを別スレッド上で動かされることで、通知されるデータがそれぞれ異なるスレッド上で行われた処理結果になるため、受け取ったデータの順に結果が通知がされなくなる可能性があります。もし、データ順にこだわる必要がある場合は、今回は説明していないconcatMapメソッドやconcatMapEagerメソッドを使う必要があります。
サンプル
次のサンプルでは空文字が含まれているデータをflatMapメソッドを通して大文字と小文字の2つのデータを300ミリ秒後に通知するようにしています。その際に空文字を取り除いて通知するようにしています。また、300ミリ秒遅らせるのに今回は説明していないdelayメソッドを使っていますが、このメソッドは受け取った通知を指定した時間だけ遅らせて通知するオペレータです。
public static void main(String[] args) throws Exception {
// 引数のデータを順に通知するFlowableの生成
Flowable<String> flowable = Flowable.just("A", "B", "", "", "C") // ①
// flatMapメソッドを使って空文字を除きかつ小文字に変換
.flatMap(data -> {
if ("".equals(data)) {
// 空文字なら空のFlowableを返す
return Flowable.empty(); // ②
} else {
// 受け取ったデータの大文字と小文字を3000ミリ秒後に通知する
return Flowable
.just(data.toUpperCase(), data.toLowerCase())
.delay(300L, TimeUnit.MILLISECONDS); // ③
}
});
// 購読開始
flowable.subscribe(new DebugSubscriber<>());
// しばらく待つ
Thread.sleep(1000L);
}
- justメソッドより"A", "B", "", "", "C"を順に通知するFlowableを生成。
- 受け取ったデータが空文字なら空のFlowableを返す。
- 受け取ったデータが空文字でないなら、その文字を大文字と小文字の2つのデータを300ミリ秒後に通知するFlowableを返す。
RxComputationThreadPool-1: A
RxComputationThreadPool-1: a
RxComputationThreadPool-3: C
RxComputationThreadPool-3: c
RxComputationThreadPool-2: B
RxComputationThreadPool-2: b
RxComputationThreadPool-2: complete
実行結果より受け取ったデータを大文字と小文字のデータに変換して通知し、かつ空文字のデータを除外していることが分かります。加えて、関数型インターフェースで生成しているFlowableがそれぞれ異なるスレッド上で実行されるため、結果として通知されるデータは受け取った順になっていないこともポイントの一つになります。また、今回のサンプルでは受け取ったデータと同じ型(String)を返していますが、異なる型(Stringでないクラス)のデータを返すことも可能です。
まとめ
今回はFlowableやObservableの基本的なオペレータについて見ていきました。ここで見たのは代表的なもので、ここで紹介したもの以外にも数多くのオペレータが用意されています。実際にRxJavaで実装する際に、ここで紹介したオペレータ以外に欲しいものがある可能性が高いです。その場合は、英語なので読むのが大変かもしれませんが、RxJavaのJavaDocを見てみるのがよいでしょう。その際のコツとしては、JavaDocのマーブルダイアグラムを見て、イメージにあったオペレータかどうかを判別して、もしイメージに合うオペレータなら説明を読むようにします。そして、簡単なサンプルを作って実際の動作確認をしてみるのがよいでしょう。