CodeZine(コードジン)

特集ページ一覧

簡単なサンプルを作って学ぶRxJava(1.x)

RxJavaによるリアクティブプログラミング入門(2)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/09/21 14:00

目次

RxJavaを使うメリット・デメリット

 さて、今回のサンプルの場合、RxJavaを使って実装してもあまりメリットがあるようには見えません。仮に今回のサンプルが出力している内容を、RxJavaを使わず一般的な実装をすると次のようになるかと思います。

public static void main(String[] args) {
  // 実行しているThread名の取得
  String threadName = Thread.currentThread().getName();
  // データの出力
  System.out.println(threadName + ": Hello, World!");
  System.out.println(threadName + ": こんにちは、世界!");
  // 完了時の出力
  System.out.println(threadName + ": 完了しました");
}

 このようにRxJavaを使った実装と比べると、かなりすっきりとした実装をすることができます。あまりに単純な処理の場合、RxJavaを使わず素直に実装したほうが何をしたいのかが分かりやすく管理もしやすいこともあります。

 では、データの生成が複雑だったり受け取ったデータの処理が複雑だったりした場合はどうでしょうか? 先ほどの一般的な実装の場合、処理の中にデータを生成する処理("Hello, World!"と"こんにちは、世界!")とデータを消費する処理(System.out.println)が混ざっています。そのため、もしデータの生産に関し何らかの変更があった場合、データを消費する側にも影響する可能性が高くなります。同様に、消費する側に変更があれば生産する側にも影響が出る可能性があります。そうなると、どちらかの処理に変更が入るたびに両方の処理の影響を考えないといけなくなります。これは実装だけではなくテストにも影響することを考慮しないといけません。

 しかし、RxJavaの場合は、生産側と消費側の役割が明確に分離されています。そのため生産側と消費側はそれぞれの役割にだけ集中することができ、どちらかに変更があったとしても、もう片方に影響を与えることを極力減らすことが可能です。

 また、この分離によって生産側はデータの生産をし消費側に渡すことまでが責任の範囲になるので、消費側の処理の結果を待つ必要がなくなります。そのため、消費側の処理が非同期で行っても生産側に影響を与えません。つまり、消費側が処理を行っている途中でも、生産側は先に処理を進めることが可能になります。

 このように、RxJavaを使うことによって、簡単に役割を明確に分離できるため、それぞれが複雑な処理を行う際にお互いに与える影響を少なくする実装が可能になります。また、この分離によって非同期で処理を行うことが容易になります。

非同期処理の簡単な説明

 それでは非同期の話が出たところで、簡単にRxJavaで行える非同期の処理について見ていきましょう。RxJavaには大きく分けて2つの箇所で非同期処理の設定ができます。一つは生産側であるObservableの処理に対して。もう一つは通知したデータを受け取った際の処理に対してです。前者はsubscribeOnメソッドを使って設定でき、後者はobserveOnメソッドを使って設定できます。

  • subscribeOnメソッド:生産側であるObservableの処理をどのようなスレッド上で行わせるかの設定ができるメソッド。
  • observeOnメソッド: 通知したデータを受け取った時の処理をどのようなスレッド上で行わせるかの設定ができるメソッド。

 ちなみにobserveOnメソッドが設定するのはOberverに対してのみでなく、通知するデータの変換やフィルタをするObservableのインスタンスメソッドを使って元となるObservableから新しく生成されたObservableにも設定されます。

 RxJavaでは非同期処理を行うのにスレッドを直接的に使うのではなくrs.Schedulerを使って設定します。このSchedulerクラスは抽象クラスであり、通常使われるSchedulerはrx.schedulers.Schedulersのメソッドを使って対象のSchedulerを取得します。今回はSchedulerを生成するメソッドには、どのようなものがあるのかについての説明はしないので、興味ある方はSchedulersのAPIを参照ください。

 それでは先ほどのあいさつの言葉を出力するサンプルに対し、Observerの処理を非同期で行うように変更してみましょう。今回はメインのスレッドと異なるスレッドで行うため、Schedulers.computation()で取得できるSchedulerを使います。このSchedulerはスレッドプールにスレッドがあればそこから取得し、なければ新しいスレッドを生成し使用後にスレッドプールに戻すSchedulerです。デフォルトではスレッドプールに保持するスレッド数は実行環境の論理スレッド数になります。

// あいさつの言葉を標準出力するサンプル
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するObservableの生成
  Observable<String> observableGreeting =
      Observable.just("Hello, World!", "こんにちは、世界!");
  
  observableGreeting
      // 通知後の処理を非同期で行うためのSchedulerを設定
      .observeOn(Schedulers.computation())  // ・・・(1)
      // Observableを購読し処理を開始する
      .subscribe(new Observer<String>() {
        
        // Observableからのデータを受け取った際の処理
        @Override
        public void onNext(String item) {
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // Observableからのデータをそのまま標準出力する
          System.out.println(threadName + ": " + item);
        }
        
        // Observableから完了を通知された際の処理
        @Override
        public void onCompleted() {
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          System.out.println(threadName + ": 完了しました");
        }
        
        // Observableからエラーを通知された際の処理
        @Override
        public void onError(Throwable e) {
          e.printStackTrace();
        }
      });
  
  // 非同期で行われていることを確認するため出力する
  String threadName = Thread.currentThread().getName();
  System.out.println(threadName + ": subscribed!");
  
  // しばらく待つ
  Thread.sleep(1000L);  // ・・・(2)
}
実行結果
main: subscribed!
RxComputationScheduler-1: Hello, World!
RxComputationScheduler-1: こんにちは、世界!
RxComputationScheduler-1: 完了しました

 この実行結果は環境によって異なる可能性(例えば"subscribed!"が"Hello, World!"の後に出力されるなど)がありますが、今回の場合は実行結果より、Observableが購読された後に非同期で行っているObserverの処理を待たずに"main: subscribed!"が出力されています。その間Observerは別のスレッド上で自身の処理を行い、受け取ったデータを出力して最後に完了時の出力をしています。これがもし、(1)のSchedulerの設定がされていない場合はデータ通知後の処理は非同期で行われず、どの環境で実行しても次のような結果になります。

main: Hello, World!
main: こんにちは、世界!
main: 完了しました
main: subscribed!

 それでは非同期で処理を行うことによるメリットについて簡単に考えてみましょう。非同期処理のメリットはAndroidなどのGUIプログラミングを行う際に顕著になります。もし、Observableをテキストの入力部品と見なし入力値の値が変わるたびにデータを通知するとした場合、非同期で処理を行っていないと、データを通知した後にそのデータが処理されるまで画面がフリーズしてしまいます。データを受け取った後の処理がすぐに終わるような場合はフリーズしている時間も一瞬のため気にもなりませんが、これが時間がかかるような処理の場合、画面が再び処理をできるようになるまで時間がかかることになります。つまり処理が終わるまで操作が行えず、かかる時間によっては大きな問題になります。

 このように非同期で処理を行えることは、ある処理を行っている最中に別の処理を行えることができるというメリットがあります。しかし、このように複数のスレッドを使ってそれぞれの処理を同時に実行できるということは異なるスレッド上の処理に関しては順番が保証されない、つまり、今回のサンプルだとメインスレッドから出力される"subscribed!"と別スレッドから出力される"Hello,World!"などの通知データのどちらが先に出力されるのかは保証されません。このことは特に複数のスレッドから一つのインスタンスにアクセスする場合に大きな問題になります。この他にもいろいろなメリットや、もちろんデメリットもありますが、とりあえずは非同期処理を行うことで、このようなメリットやデメリットがあることを覚えておきましょう。

 また、今回のサンプルでは(2)のように処理の最後にThread.sleep(1000L);で別スレッドの処理が終わるまでmainのスレッドを待機させています。もしこの処理を入れていないと、次のように非同期で行っているデータを受け取った後の処理を行えずにプログラムが終わってしまう結果になる可能性が高いです。

main: subscribed!

 これはObserverが処理を行おうとしている間にmainメソッドの処理が最後まで実行されてしまい、プログラムを終了しているためです。そのため、非同期で行っている処理を待たずにプログラム自体が終了してしまうことが困る場合には注意する必要があります。

 ただし、今回のサンプルのように処理の最後でThread.sleepを使ってスレッドを止めたり、RxJavaでデバッグやテスト用として用意されている処理が終わるまで待つようなメソッドを使って処理を止めることは、あまり良いプラクティスではありません。もし、このように処理を止める必要がある場合は、そもそも非同期で処理を行う必要があるのかどうかも含め、アプリケーションの設計を見直したほうが良いでしょう。

まとめ

 今回は現在の安定板であるRxJava 1.xを使った簡単なサンプルを作成しました。次回は、今回のサンプルを元に次のメジャーバージョンとなるRxJava 2.xを使った場合の実装を見ていきます。



  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5