対象読者
- Java経験者(初心者可)
- RxJava未経験者
- リアクティブプログラミング未経験者
※ただし、前回までの連載を読んでいる前提です。
RxJavaのオペレータとは
RxJavaのオペレータとは、前述したように新たにFlowableやObservableを生成したり、生成したFlowableやObservableが通知するデータの選別や変換などを行ったデータを通知する新たなFlowableやObservableなどを生成したりするメソッドのことです。オペレータの結果として生成されるものは、基本的にはFlowableから呼ばれればFlowableで、Observableから呼ばれればObservableを生成します。例外として、最終的に通知するデータが1件以内の場合、戻り値が今回はまだ紹介しないSingleやMaybeやCompletableになることもありますが、基本的な考え方は同じです。この戻り値がFlowable/Observableを返す性質からメソッドをつなげていくことで、データがいくつかのオペレータの処理を通過し消費者に対して使いやすくなったデータを通知する最終的なFlowable/Observableを生成できます。
また、実装でもオペレータをつなげていくメソッドチェインで、実装を簡潔にすることも可能です。例えば、次のサンプルはjustメソッドを使って引数のデータを通知するFlowableを生成し、filterメソッドを使って、そのFlowableが通知するデータを偶数のみにし、mapメソッドを使って通知するデータを10倍にしているFlowableをメソッドチェインを使って実装しています。
public static void main(String[] args) { Flowable<Integer> result = Flowable // 引数のデータを通知するFlowableを生成 .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 偶数のデータのみを通知する .filter(data -> data % 2 == 0) // 通知するデータを10倍にする .map(data -> data * 10); // 購読する result.subscribe(data -> System.out.println(data)); }
これを実行すると次の結果が出力されます。
20 40 60 80 100
これをメソッドチェインを使わずに実装すると次のようになります。
public static void main(String[] args) { // 引数のデータを通知するFlowableを生成 Flowable<Integer> flowable1 = Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // 偶数のデータのみを通知する Flowable<Integer> flowable2 = flowable1.filter(data -> data % 2 == 0); // 通知するデータを10倍にする Flowable<Integer> result = flowable2.map(data -> data * 10); // 購読する result.subscribe(data -> System.out.println(data)); }
このようにメソッドチェインを使わないと、途中で次のメソッドを呼ぶためだけの不要なインスタンスを定義する必要が出てきてしまいます。しかし、メソッドチェインを使うとこのような不要なインスタンスを生成する必要がなくなり、変数名を間違えてしまったりするようなことがなくなります。また、元のデータを最終的にどうしたいのかも、実装の一連の流れから分かりやすくなるメリットもあります。
また、一見するとデザインパターンの一つであるビルダーパターンのように見えますが、ビルダーパターンと異なり、オペレータを設定した時点でオペレータの処理を適用した新たなFlowableが生成されています。さらにこのことは、オペレータをつなげる順序によってどのようなデータが通知されるのかにも影響し、かつパフォーマンスにも影響します。例えば、10倍した後に偶数かどうかを判定するとすべてのデータが偶数になりますし、仮に10倍せず違う処理をしてデータのフィルターした場合、意図したようにデータのフィルターができたとしても、最終的に通知されない不要なデータに対して処理を行っていることになり、それがコストのかかる処理の場合は無駄にコストをかけていることになります。そのためRxJavaでメソッドチェインを使う際はオペレータの順番について何が効率的なのかを意識しないといけません。
加えて、RxJavaのオペレータの多くは引数に関数型インターフェースを受け取るようになっています。関数型インターフェースとは簡単に説明すると、実装すべきメソッドを1つだけ持つインターフェースです。この関数型インターフェースを実装するのにJava 8からラムダ式という関数型インターフェースの実装を簡易化した記述ができるようになっています。ここではラムダ式については特に説明しませんが、ラムダ式を使うと簡潔に実装ができ、かつ可読性も上がることが多いです。RxJavaの多くのオペレータは引数に関数型インターフェースを受け取るようになっているので、ラムダ式を使えるようにした方がよいでしょう。また、Java 8が使えない環境の場合、retrolambdaなどのラムダ式を古いJavaのバージョンでも使えるようにするライブラリもあるので、可能ならばラムダ式を使って実装したほうが良いでしょう。
では、今回のサンプルが何を行っているのかについて細かく見ていきましょう。まずjustメソッドを使って引数のデータである次のデータを通知するFlowableを生成しています。
1 2 3 4 5 6 7 8 9 10
このFlowableに対しfilterメソッドを使って、偶数のデータのみを通知するようにしています。filterメソッドの引数にはRxJavaのPredicateの関数型インターフェースを受け取るようになっており、どのデータを通知するのかを判別しています。また、このPredicateはJava 8のPredicateではないので、パッケージがio.reactivex.functions
になっているのと、Exceptionをthrowできるようになっていることに注意してください。
filter(Predicate<? super T> predicate)
public interface Predicate<T> { boolean test(T t) throws Exception; }
このfilterメソッドのPredicateはどのデータを通知するのかを判定する関数型インターフェースで、実装すべきメソッドとしてtestメソッドを持っています。このtestメソッドは引数に通知するデータを受け取り、そのデータを通知するかどうかを判定し、通知するならtrue
を、通知しないならfalse
を返すように定義します。今回のサンプルのfilterメソッドでは偶数なら通知するように実装しています。
.filter(data -> data % 2 == 0)
このサンプルはラムダ式を使って実装していますが、これをラムダ式を使わずに実装すると次のようになります。
.filter(new Predicate<Integer>() { @Override public boolean test(Integer data) throws Exception { return data % 2 == 0; } })
つまり、testメソッドで通知データであるdata
を受け取り、それが偶数ならtrue
を返すようにしています。そうすることで、ここでは結果がtrue
になるデータのみが通知されるようになり、このfilterメソッドから生成されたFlowableは次のデータを通知します。
2 4 6 8 10
そして、このFlowableに対しmapメソッドを使って、通知するデータを10倍するようにしています。mapメソッドの引数にはRxJavaのFunctionの関数型インターフェースを受け取るようになっており、受け取ったデータをどのように変換するのかを定義しています。また、このFunctionはJava 8のFunctionではないので、パッケージがio.reactivex.functions
になっているのと、Exceptionをthrowできるようになっていることに注意してください。
map(Function<? super T, ? extends R> mapper)
public interface Function<T, R> { R apply(T t) throws Exception; }
このmapメソッドのFunctionは受け取ったデータをどのように変換して通知するのかを定義する関数型インターフェースで、実装すべきメソッドとしてapplyメソッドを持っています。このapplyメソッドは引数に通知するデータを受け取り、そのデータから新たなデータを生成して返します。そしてこの戻り値のデータが結果として通知されるデータとなります。このサンプルのmapメソッドでは受け取ったデータを10倍して返すように実装しています。
.map(data -> data * 10)
このサンプルはラムダ式を使って実装していますが、これをラムダ式を使わずに実装すると次のようになります。
.map(new Function<Integer, Integer>() { @Override public Integer apply(Integer data) throws Exception { return data * 10; } })
つまり、applyメソッドで通知データであるdata
を受け取り、それを10倍したデータを返すようにしています。そして、その戻り値が通知されるデータとなります。このmapメソッドから生成されたFlowableは次のデータを通知します。
20 40 60 80 100
それでは、簡単にオペレータの実装例を見たところで、RxJavaにはどのようなオペレータが用意されているのか、主なオペレータについて見ていきましょう。今回のサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、今後のサンプルではデバッグ用のSubscriberとして次のものを用意しています。
/** サンプル用のSubscriber */ public class DebugSubscriber<T> extends ResourceSubscriber<T> { private String label; public DebugSubscriber() { super(); } public DebugSubscriber(String label) { super(); this.label = label; } @Override public void onNext(T data) { // onNextメソッドの呼び出し時に出力 String threadName = Thread.currentThread().getName(); if (label == null) { System.out.println(threadName + ": " + data); } else { System.out.println(threadName + ": " + label + ": " + data); } }; @Override public void onError(Throwable throwable) { // onErrorメソッドの呼び出し時に出力 String threadName = Thread.currentThread().getName(); if (label == null) { System.out.println(threadName + ": エラー = " + throwable); } else { System.out.println(threadName + ": " + label + ": エラー = " + throwable); } } @Override public void onComplete() { // onCompleteメソッドの呼び出し時に出力 String threadName = Thread.currentThread().getName(); if (label == null) { System.out.println(threadName + ": 完了"); } else { System.out.println(threadName + ": " + label + ": 完了"); } } }
サンプルでは、Subscriberで特殊なことをしない限りは、このDebugSubscriberを使っています。