今回もサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、データを受け取るSubscriberとして、特別なことをしない限り、第5回で作成したDebugSubscriberを使用しています。
対象読者
- Java経験者(初心者可)
- RxJava未経験者
- リアクティブプログラミング未経験者
※ ただし、前回までの連載を読んでいる前提です。
RxJavaの非同期処理
RxJavaでは開発者が非同期処理の設定やオペレータ内で時間を扱う処理を行わない限り、基本的には生産者(Flowable/Observable)の処理が実行されるスレッド上で、各オペレータの処理や消費者(Subscriber/Observer)の処理が行われます。つまり、生産者がデータを生成してそのデータを通知するまで後続の処理は処理を行えず待機することになり、そのデータを受け取ったオペレータや消費者が処理をしている間、元の生産者やその他のオペレータはデータを受け取った側の処理が終わるまで待っていることになります。それに対し、開発者が非同期処理の設定をすることで、生産者、オペレータ、消費者のそれぞれがお互いの処理を待つことなく自分自身のペースで処理を行えるようになります。
また、RxJavaのオペレータの中にはflatMapメソッドのように内部でFlowable/Observableを生成するものもあります。その際に内部で生成するFlowable/Observableが異なるスレッド上で処理を行うものの場合、通知データの生成処理はそれぞれの処理が一つずつ順番に行われるわけではなく、あるデータの生成処理中に別のデータの生成処理も実行されるようになります。そして、結果となるデータは処理が終わった順に通知されていきます。そのため、このことを意識していないと結果のデータが元の通知順と同じように通知されなくなる可能性があるため、注意が必要です。
加えて、バージョン2.0.5より並列処理を行うための新しいParallelFlowableが追加されました。今回はこの新しいParallelFlowableについても簡単に紹介します。ただし、この機能はまだベータ(BETA)版であるため、後のバージョンアップで変わる可能性は高いです(今回は2.1.2で検証しています)。
RxJavaの非同期処理について主に以下を見ていきます。
- 開発者による非同期処理の設定方法
- オペレータ内で生成される非同期のFlowable/Observable
- 2.0.5より追加されたparallelモード(並列モード)
開発者による非同期処理の設定方法
RxJavaで非同期処理を行う場合は、生産者が処理を行うスレッドとデータを受け取る側のスレッドの、2つのポイントについて管理する必要があります。RxJavaでは生産者が処理を実行するスレッドの種類を設定できるsubscribeOnメソッドと、データを受け取る側のスレッドの種類を設定できるobserveOnメソッドを提供しています。これらのスレッドの種類を設定するには、用途に応じたスレッドの管理を行うSchedulerを指定します。
subscribeOnメソッドおよびobserveOnメソッドと比べると使用頻度は低くなると思いますが、cancelメソッドやdisposeメソッドを呼んだ後に実行される、購読解除の処理を行う時(doOnCancel)のSchedulerを設定するunsubscribeOnメソッドもあります。unsubscribeOnメソッドは、このメソッドの後に設定されているdoOnCancelメソッドには適用されないので注意してください。
Scheduler
SchedulerはRxJavaで用意されているスレッドを管理するクラスです。RxJavaでは基本的に直接Java標準のAPIを触らず、Schedulerを使って非同期処理を行うことができるようになっています。このSchedulerは用途によっていくつかの種類が用意されており、目的のSchedulerを取得するためのメソッドを持つSchedulersによって必要なSchedulerを取得します。また、Java標準のAPIにあるjava.util.concurrent.Executorを使ってそのExecutorを使ったSchedulerを作成することも可能です。
Schedulersが持つ取得メソッド | 概要 |
---|---|
computation() | コンピューティング処理を行う際に使うScheduler。デフォルトではスレッドプールに論理プロセッサ数と同じ数のスレッドを用意し、それらのスレッドを使いまわす。 ※I/O用の処理では使わない。 |
io() | I/Oの処理を行う際に使うScheduler。基本的にスレッドプールからスレッドを取得し、スレッドプールになければ新規のスレッドを生成する。 |
single() | 単一のスレッドのみ用意し、そのスレッドしか使わないScheduler。 |
newThread() | 毎回新しいスレッドを生成するScheduler。 |
from(Executor executor) | 指定したExecutorから生成されるスレッド上で処理をするScheduler。 |
trampoline() | 呼び出し元のスレッドに新たな処理をキューとして入れるScheduler。既に他の処理がキューに入っていれば、その処理が終わってから実行される。 |
subscribeOnメソッド
subscribeOnメソッドは最初にデータを生成し通知する元となる生産者(Flowable/Observable)の処理を、どのようなScheduler上で行うのかを設定するオペレータです。
現在、安定版として公開されているsubscribeOnメソッドは以下になります。
- subscribeOn(Scheduler scheduler)
subscribeOnメソッドは生産者が行う処理のSchedulerを設定する性質上、1回しか設定できません。一度、subscribeOnメソッドを使ってSchedulerを設定すると、それより後に指定したsubscribeOnメソッドのSchedulerは無視されます。
public static void main(String[] args) throws Exception { Flowable.just(1, 2, 3, 4, 5) // Flowableの設定 .subscribeOn(Schedulers.computation()) // RxComputationThreadPool .subscribeOn(Schedulers.io()) // RxCachedThreadScheduler .subscribeOn(Schedulers.single()) // RxSingleScheduler .subscribe(data -> { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": " + data); }); // しばらく待つ Thread.sleep(500); }
RxComputationThreadPool-1: 1 RxComputationThreadPool-1: 2 RxComputationThreadPool-1: 3 RxComputationThreadPool-1: 4 RxComputationThreadPool-1: 5
この結果より、先の例ではSchedulers.computation()のSchedulerが設定されていることがわかります。ただしこのサンプルのように、subscribeOnメソッドで何度もSchedulerを設定することは混乱の原因になるので避けるべきです。また、一度しか設定できない性質により、intervalメソッドで生成された生産者のようにデフォルトで生産者のSchedulerが指定してある場合は、開発者がsubscribeOnメソッドで異なるSchedulerを設定しても、そのSchedulerの設定は反映されないことを注意しないといけません。元からデフォルトのSchedulerが設定される生産者の場合、その生成メソッドの引数にSchedulerを受け取るものが用意されており、そこに指定したSchedulerを設定することで意図したScheduler上で処理を行う生産者が生成できます。
observeOnメソッド
observeOnメソッドはデータを受け取った側の処理をどのようなScheduler上で行うのかを設定するメソッドです。observeOnメソッドで指定したSchedulerが設定したスレッド上で、それ以降の処理を行うようになります。また、observeOnメソッドがデータを受け取る側のSchedulerを指定するため、オペレータごとに異なるSchedulerを指定することが可能です。
observeOnメソッドにはSchedulerのみを引数に取るものを含め、以下のメソッドが用意してあります。引数が省略してあるメソッドは、デフォルト値の設定になります。
- observeOn(Scheduler scheduler)
- observeOn(Scheduler scheduler, boolean delayError)
- observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
引数No | 引数の型 | 説明 |
---|---|---|
第1引数 | Scheduler | スレッド管理を行うクラス。 |
第2引数 | boolean | trueの場合はエラーが発生しても、そのことをすぐには通知せず、バッファしているデータを通知し終えてからエラーを通知する。falseの場合は発生したらすぐにエラーを通知する。デフォルトはfalse。 |
第3引数 | int | 通知待ちのデータをバッファするサイズ。デフォルトでは128。 |
RxJavaでバックプレッシャーを扱う際に特に重要になるのが第3引数で、消費者に通知されるデータは、このバッファされた通知待ちのデータから取得されることになります。ここで指定した数値が、通知側に対してデータ数のリクエストを自動で行うようになっており、そのリクエストを受けて送られたデータがバッファされることになります。つまり「2」を指定すると、内部でrequest(2)が実行されていることになります。以下の図ではBackpressureStrategy.DROPが設定してある生産者に対して、observeOnメソッドのbufferSizeに「2」が指定されています。これは、observeOnメソッドの下流にある消費者がデータ数のリクエストをLong.MAX_VALUEにしても、observeOnメソッド側でリクエストして受け取ったデータしか消費者に通知されないことを表しています。消費者はobserveOnメソッドが持つ全てのデータを受け取ることになりますが、生産者側のデータに関しては全て受け取れるわけではないことを表しています。
第3引数を設定しない場合は最初にデフォルトの128件のデータ数についてリクエストを行い、データの通知が75%(デフォルトだと96件)に達したタイミングで元のデータ数の75%分についてリクエストをすることを繰り返し行います。
unsubscribeOnメソッド
unsubscribeOnメソッドは、Flowable/Observableがcancelメソッドやdisposeメソッドが呼ばれて購読を解除された際の処理を、引数に指定したSchedulerのスレッド上で実行するオペレータです。これは、doOnCancel/doOnDisposeメソッドで定義した処理やcreateメソッドで通知処理を行うEmitterで設定したCancellableやDisposableの処理が購読解除した際に、unsubscribeOnメソッドの引数におけるSchedulerのスレッド上で実行されるようになります。ただし、完了やエラーで処理を終了した場合にはこのSchedulerは使われません。