SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

RxJavaによるリアクティブプログラミング入門

RxJavaの非同期処理と新機能ParallelFlowable

RxJavaによるリアクティブプログラミング入門(7)

  • X ポスト
  • このエントリーをはてなブックマークに追加

 この連載はRxJava 2.xを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回まではFlowable/Observableの生成を行うオペレータからそれらのデータを操作するさまざまなオペレータなど、代表的なものについて見てきました。今回はそれらの処理をどのように非同期で実行することができるのかについて見ていきます。加えて、今回はバージョン2.0.5より追加されたParallelFlowableについて簡単に解説します(今回はRxJava 2.1.2で検証しています)。

  • X ポスト
  • このエントリーをはてなブックマークに追加

 今回もサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、データを受け取るSubscriberとして、特別なことをしない限り、第5回で作成したDebugSubscriberを使用しています。

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ ただし、前回までの連載を読んでいる前提です。

RxJavaの非同期処理

 RxJavaでは開発者が非同期処理の設定やオペレータ内で時間を扱う処理を行わない限り、基本的には生産者(Flowable/Observable)の処理が実行されるスレッド上で、各オペレータの処理や消費者(Subscriber/Observer)の処理が行われます。つまり、生産者がデータを生成してそのデータを通知するまで後続の処理は処理を行えず待機することになり、そのデータを受け取ったオペレータや消費者が処理をしている間、元の生産者やその他のオペレータはデータを受け取った側の処理が終わるまで待っていることになります。それに対し、開発者が非同期処理の設定をすることで、生産者、オペレータ、消費者のそれぞれがお互いの処理を待つことなく自分自身のペースで処理を行えるようになります。

 また、RxJavaのオペレータの中にはflatMapメソッドのように内部でFlowable/Observableを生成するものもあります。その際に内部で生成するFlowable/Observableが異なるスレッド上で処理を行うものの場合、通知データの生成処理はそれぞれの処理が一つずつ順番に行われるわけではなく、あるデータの生成処理中に別のデータの生成処理も実行されるようになります。そして、結果となるデータは処理が終わった順に通知されていきます。そのため、このことを意識していないと結果のデータが元の通知順と同じように通知されなくなる可能性があるため、注意が必要です。

 加えて、バージョン2.0.5より並列処理を行うための新しいParallelFlowableが追加されました。今回はこの新しいParallelFlowableについても簡単に紹介します。ただし、この機能はまだベータ(BETA)版であるため、後のバージョンアップで変わる可能性は高いです(今回は2.1.2で検証しています)。

 RxJavaの非同期処理について主に以下を見ていきます。

  • 開発者による非同期処理の設定方法
  • オペレータ内で生成される非同期のFlowable/Observable
  • 2.0.5より追加されたparallelモード(並列モード)

開発者による非同期処理の設定方法

 RxJavaで非同期処理を行う場合は、生産者が処理を行うスレッドとデータを受け取る側のスレッドの、2つのポイントについて管理する必要があります。RxJavaでは生産者が処理を実行するスレッドの種類を設定できるsubscribeOnメソッドと、データを受け取る側のスレッドの種類を設定できるobserveOnメソッドを提供しています。これらのスレッドの種類を設定するには、用途に応じたスレッドの管理を行うSchedulerを指定します。

subscribeOnメソッドとobserveOnメソッド
subscribeOnメソッドとobserveOnメソッド

 subscribeOnメソッドおよびobserveOnメソッドと比べると使用頻度は低くなると思いますが、cancelメソッドやdisposeメソッドを呼んだ後に実行される、購読解除の処理を行う時(doOnCancel)のSchedulerを設定するunsubscribeOnメソッドもあります。unsubscribeOnメソッドは、このメソッドの後に設定されているdoOnCancelメソッドには適用されないので注意してください。

Scheduler

 SchedulerはRxJavaで用意されているスレッドを管理するクラスです。RxJavaでは基本的に直接Java標準のAPIを触らず、Schedulerを使って非同期処理を行うことができるようになっています。このSchedulerは用途によっていくつかの種類が用意されており、目的のSchedulerを取得するためのメソッドを持つSchedulersによって必要なSchedulerを取得します。また、Java標準のAPIにあるjava.util.concurrent.Executorを使ってそのExecutorを使ったSchedulerを作成することも可能です。

Schedulersが持つ取得メソッド
Schedulersが持つ取得メソッド 概要
computation() コンピューティング処理を行う際に使うScheduler。デフォルトではスレッドプールに論理プロセッサ数と同じ数のスレッドを用意し、それらのスレッドを使いまわす。 ※I/O用の処理では使わない。
io() I/Oの処理を行う際に使うScheduler。基本的にスレッドプールからスレッドを取得し、スレッドプールになければ新規のスレッドを生成する。
single() 単一のスレッドのみ用意し、そのスレッドしか使わないScheduler。
newThread() 毎回新しいスレッドを生成するScheduler。
from(Executor executor) 指定したExecutorから生成されるスレッド上で処理をするScheduler。
trampoline() 呼び出し元のスレッドに新たな処理をキューとして入れるScheduler。既に他の処理がキューに入っていれば、その処理が終わってから実行される。

subscribeOnメソッド

 subscribeOnメソッドは最初にデータを生成し通知する元となる生産者(Flowable/Observable)の処理を、どのようなScheduler上で行うのかを設定するオペレータです。

subscribeOnメソッド
subscribeOnメソッド

 現在、安定版として公開されているsubscribeOnメソッドは以下になります。

  • subscribeOn(Scheduler scheduler)

 subscribeOnメソッドは生産者が行う処理のSchedulerを設定する性質上、1回しか設定できません。一度、subscribeOnメソッドを使ってSchedulerを設定すると、それより後に指定したsubscribeOnメソッドのSchedulerは無視されます。

subscribeOnメソッドで有効になるSchedulerの確認
public static void main(String[] args) throws Exception {
  Flowable.just(1, 2, 3, 4, 5) // Flowableの設定
      .subscribeOn(Schedulers.computation()) // RxComputationThreadPool
      .subscribeOn(Schedulers.io()) // RxCachedThreadScheduler
      .subscribeOn(Schedulers.single()) // RxSingleScheduler
      .subscribe(data -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + ": " + data);
      });
  // しばらく待つ
  Thread.sleep(500);
}
実行結果
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 5

 この結果より、先の例ではSchedulers.computation()のSchedulerが設定されていることがわかります。ただしこのサンプルのように、subscribeOnメソッドで何度もSchedulerを設定することは混乱の原因になるので避けるべきです。また、一度しか設定できない性質により、intervalメソッドで生成された生産者のようにデフォルトで生産者のSchedulerが指定してある場合は、開発者がsubscribeOnメソッドで異なるSchedulerを設定しても、そのSchedulerの設定は反映されないことを注意しないといけません。元からデフォルトのSchedulerが設定される生産者の場合、その生成メソッドの引数にSchedulerを受け取るものが用意されており、そこに指定したSchedulerを設定することで意図したScheduler上で処理を行う生産者が生成できます。

observeOnメソッド

 observeOnメソッドはデータを受け取った側の処理をどのようなScheduler上で行うのかを設定するメソッドです。observeOnメソッドで指定したSchedulerが設定したスレッド上で、それ以降の処理を行うようになります。また、observeOnメソッドがデータを受け取る側のSchedulerを指定するため、オペレータごとに異なるSchedulerを指定することが可能です。

observeOnメソッド
observeOnメソッド

 observeOnメソッドにはSchedulerのみを引数に取るものを含め、以下のメソッドが用意してあります。引数が省略してあるメソッドは、デフォルト値の設定になります。

  • observeOn(Scheduler scheduler)
  • observeOn(Scheduler scheduler, boolean delayError)
  • observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
observeOnメソッドの引数
引数No 引数の型 説明
第1引数 Scheduler スレッド管理を行うクラス。
第2引数 boolean trueの場合はエラーが発生しても、そのことをすぐには通知せず、バッファしているデータを通知し終えてからエラーを通知する。falseの場合は発生したらすぐにエラーを通知する。デフォルトはfalse。
第3引数 int 通知待ちのデータをバッファするサイズ。デフォルトでは128。

 RxJavaでバックプレッシャーを扱う際に特に重要になるのが第3引数で、消費者に通知されるデータは、このバッファされた通知待ちのデータから取得されることになります。ここで指定した数値が、通知側に対してデータ数のリクエストを自動で行うようになっており、そのリクエストを受けて送られたデータがバッファされることになります。つまり「2」を指定すると、内部でrequest(2)が実行されていることになります。以下の図ではBackpressureStrategy.DROPが設定してある生産者に対して、observeOnメソッドのbufferSizeに「2」が指定されています。これは、observeOnメソッドの下流にある消費者がデータ数のリクエストをLong.MAX_VALUEにしても、observeOnメソッド側でリクエストして受け取ったデータしか消費者に通知されないことを表しています。消費者はobserveOnメソッドが持つ全てのデータを受け取ることになりますが、生産者側のデータに関しては全て受け取れるわけではないことを表しています。

バッファサイズと通知の関係(DROPの場合)
バッファサイズと通知の関係(DROPの場合)

 第3引数を設定しない場合は最初にデフォルトの128件のデータ数についてリクエストを行い、データの通知が75%(デフォルトだと96件)に達したタイミングで元のデータ数の75%分についてリクエストをすることを繰り返し行います。

unsubscribeOnメソッド

 unsubscribeOnメソッドは、Flowable/Observableがcancelメソッドやdisposeメソッドが呼ばれて購読を解除された際の処理を、引数に指定したSchedulerのスレッド上で実行するオペレータです。これは、doOnCancel/doOnDisposeメソッドで定義した処理やcreateメソッドで通知処理を行うEmitterで設定したCancellableやDisposableの処理が購読解除した際に、unsubscribeOnメソッドの引数におけるSchedulerのスレッド上で実行されるようになります。ただし、完了やエラーで処理を終了した場合にはこのSchedulerは使われません。

unsubscribeOnメソッド
unsubscribeOnメソッド

会員登録無料すると、続きをお読みいただけます

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

次のページ
オペレータ内で生成される非同期のFlowable/Observable

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
RxJavaによるリアクティブプログラミング入門連載記事一覧

もっと読む

この記事の著者

須田 智之(スダ トモユキ)

十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/10159 2017/09/13 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング