CodeZine(コードジン)

特集ページ一覧

リアクティブプログラミングとRxJavaの概要

RxJavaによるリアクティブプログラミング入門(1)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/09/01 14:00

目次

RxJava(1.x)の基本的な仕組み

 次期バージョンである「RxJava 2.x」と執筆時点で安定版とされている「RxJava 1.x」とはクラス名やメソッド名などが異なるので、今回はRxJava 1.xについて解説していきます。本稿での「RxJava」はRxJava 1.xのことを指します。それではRxJavaについて見ていきましょう。

 RxJavaの基本的な仕組みは、データを生産し通知する「Observable」と通知されたデータを受け取りそのデータを消費する「Observer」の2つの関係で成り立っています。このObservableをObserverが購読(subscribe)することで、Observableが通知したデータをObserverが受け取ることができるようになります。

  • Observable: データを通知するオブジェクト
  • Observer: データを受け取り処理をするオブジェクト

 そして、Observableが問題なく全てのデータを通知し終えると、すぐに正常終了をしたことをObserverに通知します。もし途中でエラーが発生した場合はエラーオブジェクトをObserverに通知し異常終了したことを伝えます。RxJavaではObservableが正常終了もしくは異常終了したことを伝えた後は何も処理を行わない前提になっています。つまり、正常終了したことを通知した後に他のデータや異常終了を通知することはできず、異常終了を通知した後も他のデータや正常終了をしたことを通知することはできません。

 そのため「終了」だけだと紛らわしくなるので、この連載では正常終了したことを通知することを「完了」を通知すると言い、異常終了したことを通知することを「エラー」を通知すると言うようにします。そして「終了」が通知されるということは、「完了」もしくは「エラー」のどちらかが通知されることを言うようにします。つまり「完了」を通知するということは全てのデータが問題なく通知され正常終了したことを表し、「エラー」を通知するということはObservableの処理中に何らかのエラーが発生し異常終了したことを表します。「終了」を通知すると表現した場合は「完了」もしくは「エラー」のどちらかかを通知することを表します。

  • 完了:正常終了
  • エラー:異常終了
  • 終了:完了(正常終了)もしくはエラー(異常終了)

 このObservableとObserverとの基本的な処理の流れは次のようになります。

  1. ObservableをObserverが購読(subscribe)する。
  2. Observableの処理を開始する。
  3. Observableがデータを生産しObserverに対しそのデータを通知する。
  4. データを受け取ったObserverはそのデータを使って処理を行う。
  5. Observableは全てのデータを通知したら完了の通知を、途中でエラーが発生したらエラーを通知し、処理を終了する。
正常時の処理の流れ
正常時の処理の流れ

 Observableには、Observerから購読されると必ず処理を開始する"Cold"なObservableと、Observerによる購読に関係なく処理を開始することもある"Hot"なObservableがあります。ここではCold Observableについて説明していますが、ObservableがObserverにどのようにデータを通知するかの大枠だけ見ると、ColdでもHotでも基本的にはほぼ同じ処理になります。

Observable

 Observableの通知処理にはObservable契約というガイドラインがあり、次のようなことが書いてあります。

  • 0個以上のデータをシーケンシャル(逐次的)に通知する。
  • 全てのデータを通知し終えたら完了を通知する。
  • 途中でエラーが発生したらエラーを通知する。
  • 完了もしくはエラーを通知したらそれ以降の通知はしない。

 それでは実際にデータを通知するObservableを生成する例を見てみましょう。Observableの生成方法はこの他にもありますが、まずは基本的な仕組みがわかりやすいcreateメソッドを見ていきます。

Observableの実装例
Observable.create(new OnSubscribe<T>() {
  
  @Override
  public void call(Subscriber<? super T> subscriber) {
    try {
      …通知するデータの生成処理など

      // データの通知
      subscriber.onNext(データ);
      subscriber.onNext(データ);
      subscriber.onNext(データ);
      …
      
      // 完了したことを通知する
      subscriber.onCompleted();
    } catch (SomethingException e) {
      // エラーが発生したことを通知する
      subscriber.onError(e);
    }
  }
})

 ここではObservableのstaticファクトリメソッドであるcreateメソッドを使ってObservableを生成しています。ちなみにstaticファクトリメソッドを簡単に説明すると、対照のクラスのインスタンスを返すメソッドで、コンストラクタからの生成に比べ細かい設定が隠蔽できたりなどの利点を持つ、よく使われるプログラミングのテクニックの一つです。このcreateメソッドの引数であるrx.Observable.OnSubscribeは関数型インタフェースであり、実装されるべきメソッドを一つだけ持っています。このOnSubscribeのメソッドを実装することで、Observableが何をいつ通知するのかを定義することができます。

 このOnSubscribeのcallメソッドでは引数にObserverへの通知を行うSubscriberを受け取ります。このSubscriberにはObserverにデータの通知および完了やエラーの通知を行うメソッドが用意されています。

Subscriberの通知メソッド
メソッド 概要
onNext(T t) Observableのデータを通知します。
onCompleted() Observableの処理が完了(正常終了)したことを通知します。
onError(Throwable e) Observableの処理中に発生したエラーを通知します。

 まず、Observerへのデータの通知にはSubscriberのonNextメソッドを使います。このonNextメソッドの引数に通知するデータを渡すと、Observerにそのデータを通知します。通知するデータが複数ある場合はSubscriberのonNextメソッドに順番にデータを渡すことで、Observerにもその順番通りにデータが通知されます。

 ここで重要なことは、Observableがデータを通知する際には、同じスレッドもしくは同期がとられた状態でSubscriberのメソッドが呼ばれることを前提として設計されている点です。例えば、Observableの内部で複数のスレッドを生成し、同期をせずに別々のスレッドからSubscriberのonNextメソッドにデータを渡すことは安全でない通知と見なされます。

安全ではない通知
安全ではない通知

 そのため、もし、データの生成を複数スレッドから行ったとしても、SubscriberのonNextメソッドに通知するデータを渡す際は同期してから渡さなくてはいけません。

安全な通知
安全な通知

 しかし、RxJavaを使う上で、Observableにこのような同期処理を開発者自身が実装することはバグを生み出したり、非効率な同期を行いパフォーマンスを逆に下げたりする危険性があるので避けた方がよいでしょう。そのような処理が必要な場合はそもそもの設計を見直すか、RxJavaに用意されている複数のObservableを一つに結合するメソッドなどをうまく使って処理を行うようにしたほうが安全です。この結合するメソッドを使う場合、各Observableが異なるスレッド上で処理をしていたとしても、結合されたObservableは内部で同期を取り安全に通知を行うので、開発者が同期についての実装を行う必要がなくなります。

 そして、Observableが全てのデータを通知し、処理が最後まで正常に行われたら、SubscriberのonCompletedメソッドを呼んで、Observerに処理が完了したことを通知します。このonCompletedメソッドは処理が正常に完了したことを示すメソッドであり、それ以降データは通知されない前提になっています。ですので、SubscriberのonCompletedメソッドを呼んだ後は何も処理をせず終了するようにしてください。もし、onCompletedメソッドを呼んだ後に何らかの後処理をした場合、そこでエラーが発生しても、そのエラーはObserverには通知されず、何もそのエラーに対する処理が行われない可能性が高くなります。

 もし、Observableの処理の途中でエラーが発生した場合は、SubscriberのonErrorメソッドを呼びます。このonErrorメソッドはObserverにエラーが発生したことを通知するメソッドであり、引数にエラーのオブジェクト(Throwable)を渡すことでObserverにそのエラーのオブジェクトが通知されます。RxJavaではSubscriberのonErrorメソッドが呼ばれることはObsrvableの処理が異常終了したことを示し、その後はonCompletedメソッド同様にObservableでは処理を行わないことが前提になっています。

 また、このサンプルでは分かりやすさを優先するため割愛しましたが、本来はSubscriberで通知処理を行う前に購読解除(unsubscribe)されていないか確認をしないといけません。つまり、Observableが通知を行う際に、通知を受け取るObserverから購読解除されて通知を行う相手がいなくなっていれば、それ以上の通知を続けることは不要になります。

 そして、Observableには大きく分けて2つの種類があります。一つは"Cold"なObservabeと呼ばれるもので、もう一つは"Hot"なObservableと呼ばれるものです。この二つの違いを簡単に説明すると、"Cold"なObservableは必ず一つのObserverにしか通知しないのに対して、"Hot"なObservableは複数のObserverに通知することが可能な点です。これは"Cold"なObservableが処理の途中で別のObserverに購読された場合、既に行っている処理のタイムラインとは別のタイムラインを生成して新しく購読したObserverに通知を行う性質によります。そのためObserverが購読するたびに新しい処理を最初から始めるので、"Cold"なObservableは必ず一つのObserverにしか通知できません。これに対し"Hot"なObservableが処理の途中で別のObserverに購読されたら、既に実行されている処理のタイムラインを通じて新たなObserverにデータを通知します。つまり、前から購読しているObserverにも後で購読し始めたObserverにも同じデータを通知します。そのため、"Hot"なObservableは複数のObserverに対して同じデータを通知することが可能になります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5