CodeZine(コードジン)

特集ページ一覧

リアクティブプログラミングとRxJavaの概要

RxJavaによるリアクティブプログラミング入門(1)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/09/01 14:00

目次

Observer

 ObserverインタフェースはObservableの通知を受けて、通知の種類ごとの処理を行うためのインタフェースです。Observerは次の通知を受け取り、各通知ごとのメソッドを持っています。

  • データ(onNext)
  • 完了(onCompleted)
  • エラー(onError)
Observerの実装例
new Observer<T>() {
  
  @Override
  public void onNext(T item) {
    …Observableから通知されたデータ(item)を受け取った際の処理
  }
  
  @Override
  public void onCompleted() {
    …Observableから完了の通知を受け取った際の処理
  }
  
  @Override
  public void onError(Throwable e) {
    …Observableからエラーの通知を受け取った際の処理
  }
}

 ObserverのonNextメソッドは、Observableが通知したデータを受け取り、そのデータを使って処理をするメソッドです。onNextメソッドの引数はObservableのcreateメソッドの内部でSubscriberのonNextメソッドに渡したデータが通知されます。Observerはこのデータを受け取って何らかの処理を行います。

 ObserverのonCompletedメソッドは、Observableが完了を通知した際の処理を行うメソッドです。Observableのcreateメソッドの内部にてSubscriberのonCompletedメソッドを呼ぶことで、ObserverのonCompletedメソッドが実行されます。

 ObserverのonErrorメソッドは、Observableからエラーが通知された場合に処理を行うメソッドです。Observableからのエラー通知は2種類あり、一つはObservableのcreateメソッドの内部にて明示的にSubscriberのonErrorメソッドを呼んだ場合で、もう一つはcreateメソッド内でcatchされないエラーが発生した場合です。この時、ObserverのonErrorメソッドの引数にはObservableが通知したエラーのオブジェクトが渡されます。

 しかし、Observable内で発生するcatchされないエラーの中にはStackOverflowErrorやVirtualMachineErrorのように継続すること自体が好ましくないエラーもあります。そのようなエラーは通知ではなく、そのエラーをそのままスローすることで処理を終了します。

Observableのsubscribeメソッド

 ObservableとObserverの準備ができたらObservableのsubscribeメソッドの引数にObserverを渡してObservableを購読します。そうすることでObservableが通知するデータをObserverが受け取ることが可能になります。また、subscribeメソッドを呼ぶと戻り値としてSubscriptionを受け取ります。

subscribeメソッドの呼び出し例
Subscription subscription = observable.subscribe(observer);

 subscribeメソッドにはObserverを引数として受け取るもののほか、通知時の処理が実装された関数型インタフェースを引数にとるものなど様々なメソッドが用意されていますが、ここでは基本的なObserverを引数に取るものを見ていきましょう。

 subscribeメソッドにObserverを渡すと、ObservableはそのObserverに対して生成したデータを通知できるようになります。ただし、subscribeメソッドを呼ぶことによってObservableが処理を開始するかどうかは、そのObservableが"Cold"か"Hot"かによって変わってきます。

 ColdなObservableの場合、subscribeメソッドが呼ばれると、Observableは処理を開始します。また、Observableが処理を行っている最中に、別のObserverがそのObservableに対してsubscribeメソッドを呼んだ場合、既に行っている処理はそのまま継続したまま、新しい処理をその別のObserverに対して最初から開始します。つまり、Observerが購読するたびに新しい通知の処理ができることになります。

 例えば1、2、3と順にデータを通知する"Cold"なObservableに対して、あるObserverが購読したとします。そして、そのObservableが1を通知した後に別のObserverがそのObservableを購読した場合、最初のObserverは順に1、2、3とデータを受け取り、後で購読したObserverは購読したタイミングから順に1、2、3とデータを受け取ることになります。

 これに対しHotなObservableの場合、Observableの処理を開始するのかどうかはHot Observableの種類によって異なり、開始するものもあれば開始しないものもあります。また、既に処理を開始しているHot Observableに対し別のObserverがsubscribeメソッドを呼んだ場合、そのObservableは最初から処理をするのではなく、処理の途中からそのObserverにも通知を行います。

 例えば先ほどの1、2、3と順にデータを通知するObservableが"Hot"なObservableだった場合、そのObservableに対してObserverが購読したとします。そして、そのObservableが1を通知した後に別のObserverがそのObservableを購読した場合、この"Hot"なObservableが通知したデータをキャッシュしていないタイプだと、最初のObserverは1、2、3とデータを受け取り、後で購読したObserverは2、3とデータを受け取ることになります。

 そしてObservableのsubscribeメソッドは戻り値にSubscriptionを返します。これは他の引数をとるsubscribeメソッドも同様でSubscriptionを戻り値として返します。このSubscriptionは途中で購読を解除するためのunsubscribeメソッドを持っていて、unsubscribeメソッドを呼ぶことでObservableの処理の停止を促せます。促せるというのはunsubscribeメソッドを呼んでもObservableが途中で処理を止めるのが危険な場合もあるため、購読の解除をリクエストしても安全な状態になるまで処理が続くこともあるためです。そして、unsubscribeメソッドを呼ばれて処理を止めたObservableは基本的に完了もエラーも通知しません。

 また、Observableのcreateメソッドのように、実装者が直接どのような通知を行うのかを実装したObservableを生成している場合、購読解除された際の処理がプログラムに記述されていないと、unsubscribeメソッドが呼ばれても処理は中断されず最後まで通知が行われてしまいます。

 このことは、例えば、Observableが膨大の数のデータ通知している場合に、unsubscribeメソッドを呼んでその処理を途中で止めようとしても、そもそも購読解除時の処理が実装されていないので完了もしくはエラーが発生するまで、このObservableは処理を続けるようなことになってしまいます。

 このようなことを避けるため、Observableが購読解除されている状態なら処理を中断し、それ以上の処理を行わないように実装しておく必要があります。購読解除されたかどうかはSubscriberのisUnsubscribedメソッドで確認することができます。trueが返る場合はObservableが購読解除されたことを示します。購読解除時の処理を追加した場合は次のようになります。

購読解除(unsubscribe)時の処理を追加した場合
Observable.create(new OnSubscribe<T>() {
  
  @Override
  public void call(Subscriber<? super T> subscriber) {
    try {
      while(true){
        // 購読解除されているなら処理をやめる
        if (subscriber.isUnsubscribed()){
          return;
        }
        
        …通知するデータの生成処理など
        
        // データを通知する
        subscriber.onNext(item);
        
        // 条件によってループを抜ける
        if (何らかの条件){
          break;
        }
      }
      
      // 購読解除されていない場合だけ実行
      if (!subscriber.isUnsubscribed()){
        // 完了したことを通知する
        subscriber.onCompleted();
      }
    } catch (Exception e) {
      // 購読解除されていない場合だけ実行
      if (!subscriber.isUnsubscribed()){
        // エラーが発生したことを通知する
        subscriber.onError(e);
      }
    }
  }
})

 また、一つのObserverが複数のObservableのデータを必要とする場合は、Observableを連結し新たなObservableを生成することで一つのObserverで対応できるようになります。Observableにはそのような連結を行うためのメソッドがあります。

 例えば、商品の価格と税率から税額を計算するような場合、ObservableのcombineLatestメソッドを使って、次のような実装で商品価格の変化や税率の変化に対応することが可能になります。

2つのObservableが通知したデータを受け取る例
Observable<BigDecimal> priceObs = …商品価格の値を通知するObservable
Observable<BigDecimal> taxRateObs = …税率の値を通知するObservable

// 2つのObservableのデータから新しいデータを通知するObservableを生成
Observable
    .combineLatest(priceObs, taxRateObs, // 結合するObservable
        // 各Observableから受け取ったデータを使って処理をする
        // 関数型インタフェース
        (price, taxRate) -> price.multiply(taxRate)
            .setScale(0, RoundingMode.DOWN))
    // 購読開始
    .subscribe( new Observer<>(){
       @Override
       public void onNext(BigDecimal tax){
         関数型インタフェースの計算結果を受け取り処理を行う
       }

       …完了時とエラー時の処理は省略
     });

 ObservableのcombineLatestメソッドは、引数に渡された複数のObservableのデータを使って新しいデータを生成し、その新しいデータを通知するObservableを生成します。引数に渡されたObservableのどれかがデータを通知したタイミングで、各Observableが最後に通知したデータを引数として関数型インタフェースに渡します。そこで定義した処理を行った結果がデータとして通知されます。

 他にも複数のObservableを連結させるメソッドがRxJavaには用意されています。これらのメソッドを使えば一つのObservableになるので、その一つになったObservableに対しObserverが購読することで、複数のObservableの通知が必要であっても、一つのObserverで通知を受け取ることが可能になります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5