CodeZine(コードジン)

特集ページ一覧

簡単なサンプルを作って学ぶRxJava(1.x)

RxJavaによるリアクティブプログラミング入門(2)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/09/21 14:00

目次

最初のサンプルの作成

 今回作るサンプルは、"Hello, World!"と"こんにちは、世界!"を通知して、受け取ったデータを標準出力し、すべてのデータを受け取った後に完了時の処理として"完了しました"と出力するものです。Observableがデータを通知し、Observerが各通知を受け取り処理します。今回はObservableが"Cold"のため、ObserverがObservableを購読すると処理を開始します。今回の処理は非同期で行わず、メインのスレッドのみで実行されています。このサンプルが行う処理の流れは次のとおりです。

サンプルのシーケンス図
サンプルのシーケンス図
  1. ObserverがObservableを購読し、Observableの処理を開始する。
  2. Observableが文字列"Hello, World!"を通知する。
  3. Observerがデータを受け取り、"Hello, World!"と標準出力する。
  4. Observableが文字列"こんにちは、世界!"を通知する。
  5. Observerがデータを受け取り、"こんにちは、世界!"と標準出力する。
  6. すべてのデータを通知した後にObservableが完了(onCompleted)したことを通知する。
  7. 完了の通知を受け取ったObserverが"完了しました"と標準出力する。

 ちなみに後で非同期で処理を行った際の動作を確認するため、標準出力を行う際に実行されているスレッドの名前も出力しています。

// あいさつの言葉を標準出力するサンプル
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するObservableの生成
  Observable<String> observableGreeting =
      Observable.create(new OnSubscribe<String>() {
        
        @Override
        public void call(Subscriber<? super String> subscriber) {
          // 購読解除されている場合は処理をやめる
          if (subscriber.isUnsubscribed()) {
            return;
          }
          
          // 1回目の通知をする
          subscriber.onNext("Hello, World!");
          // 2回目の通知をする
          subscriber.onNext("こんにちは、世界!");
          
          // 購読解除されていない場合
          if (!subscriber.isUnsubscribed()) {
            // 完了したことを通知する
            subscriber.onCompleted();
          }
        }
      });
      
  // Observableを購読し処理を開始する
  observableGreeting.subscribe(new Observer<String>() {
    
    // Observableからのデータを受け取った際の処理
    @Override
    public void onNext(String item) {
      // 実行しているThread名の取得
      String threadName = Thread.currentThread().getName();
      // Observableからのデータをそのまま標準出力する
      System.out.println(threadName + ": " + item);
    }
    
    // Observableから完了を通知された際の処理
    @Override
    public void onCompleted() {
      // 実行しているThread名の取得
      String threadName = Thread.currentThread().getName();
      System.out.println(threadName + ": 完了しました");
    }
    
    // Observableからエラーを通知された際の処理
    @Override
    public void onError(Throwable e) {
      e.printStackTrace();
    }
  });
}
実行結果
main: Hello, World!
main: こんにちは、世界!
main: 完了しました

 それでは、サンプルでは何をやっているのか見てみましょう。まずはデータを通知するObservableについて見ていきます。

Observable

 Observableが行っている処理は次のようになっています。

// あいさつの言葉を通知するObservableの生成
Observable<String> observableGreeting =
  Observable.create(new OnSubscribe<String>() {
    
    @Override
    public void call(Subscriber<? super String> subscriber) {
      // 購読解除されている場合は処理をやめる ・・・(1)
      if (subscriber.isUnsubscribed()) {
        return;
      }
      
      // 1回目の通知をする ・・・(2)
      subscriber.onNext("Hello, World!");
      // 2回目の通知をする ・・・(3)
      subscriber.onNext("こんにちは、世界!");
            
      // 購読解除されていない場合 ・・・(4)
      if (!subscriber.isUnsubscribed()) {
        // 完了したことを通知する
        subscriber.onCompleted();
      }
      
      // 完了通知後は何もしない ・・・(5)
    }
  });

 実際にデータを通知する処理は、createメソッドの引数であるOnSubscribeインタフェースのcallメソッドで行われており、callメソッドの引数であるSubscriberを通じてObserverに通知を行っています。

 まず、このcallメソッド内で、(1)で最初にObservableが購読解除(unsubscribe)されていないかをSubscriberのisUnsubscribedメソッドを使って確認し、購読解除されていたら処理を終えるようにしています。この確認処理は、もしすでに購読解除されているにも関わらず処理を続行した場合、Observerにデータが通知され続けることになるのを避けるために行っています。Observerが購読解除したにも関わらずデータを受け続けることは、無駄にCPUやメモリのリソースを消費することになってしまいます。そのため、購読解除されたら処理を終えるようにしています。このサンプルでは、そもそも購読解除を行わないのでObservableが購読解除されることはありませんが、仮に後で購読解除された場合を考えて、購読解除された際の処理を明示しておいた方が良いでしょう。

 次にObservableは(2)(3)でデータを通知する処理を行っています。createメソッド内ではSubscriberのonNextメソッドの引数に通知するデータを渡すことで、そのデータをObserverに通知することができます。ここでは、"Hello, World!"と"こんにちは、世界!"の文字列をonNextメソッドを呼んだ順に通知しています。

 次にすべてのデータを通知し終えた後にSubscriberのonCompletedメソッドを呼ぶことでObserverに処理が完了したことを通知しています。(4)ではonCompletedメソッドを呼ぶ前に購読解除されていないかを確認し、購読解除されていない場合のみ完了を通知するようにしています。ここもまた、購読解除のチェックをしていなければ、購読解除を行ってもObserverに完了が通知されてしまうことになります。

 そして最後の(5)ではonCompletedメソッドを呼んだ後は何もしていない点に注目してください。createメソッドを実装する際の重要な点として、RxJavaでは完了の通知後にObserverには何も通知しないことになっています。そのため、今回のサンプルではonCompletedメソッドを呼んだ後に何も処理をしないようにしています。これは、もしonCompletedメソッドを呼んだ後に、何らかの処理を行いエラーが発生しても、Observerにはエラーの通知が届かず適切な処理が行えない可能性があるためです。もし、すべてのデータを通知した後に何らかの後処理を行わないといけない場合は、onCompletedメソッドを呼ぶ前にその処理を行い、処理が終わった後にonCompletedメソッドを呼ぶようにしたほうが良いでしょう。

 また、今回は使っていないですが、もし何らかのエラーをキャッチしないといけない場合、SubscriberのonErrorメソッドにそのエラーオブジェクトを渡すことで、Observerにエラーの通知が行え、そのエラーオブジェクトを渡すことができます。このonErrorメソッドを呼んだ後はonCompletedメソッドと同様にObserverには何も通知されない前提になっているので、速やかに処理をやめるようにしてください。

 また、Observableのcreateメソッドの引数であるOnSubscribeインタフェースは関数型インタフェースなので、ラムダ式が使える環境の場合、ラムダ式を使って実装することも可能です。

Observable<String> observableGreeting =
  Observable.create(subscriber -> {
      …略
  });

 さて、Observableでデータを通知する準備ができたら、そのデータをObserverに購読(subscribe)させなければなりません。Observerに購読させるには、Observableのsubscribeメソッドを使います。そのsubscribeメソッドの引数にObserverを渡すことで、その引数のObserverはObservableからの通知を受け取れるようになります。そしてObservableが"Cold"である場合、subscribeメソッドが呼ばれると、Observableは処理を開始しObserverにデータを通知します。

Observer

 それでは、通知されたデータを受け取るObserverを見てみましょう。Observerは次の3つのメソッドの実装が必要になります。

new Observer<String>() {
  
  // Observableからのデータを受け取った際の処理
  @Override
  public void onNext(String item) {
    …略
  }
  
  // Observableから完了を通知された際の処理
  @Override
  public void onCompleted() {
    …略
  }
  
  // Observableからエラーを通知された際の処理
  @Override
  public void onError(Throwable e) {
    …略
  }
})

 まず、Observableからデータが通知され、そのデータを受け取るとObserverのonNextメソッドが実行されます。onNextメソッドの引数itemはObservableから通知されたデータです。今回はObservableのcreateメソッド内でSubscriberのonNextメソッドの引数に渡されたデータがそのままObserverに通知されています。Observerは自身のonNextメソッド内で受け取ったこのデータを使って何らかの処理を行います。今回のサンプルでは実行されているスレッド名とともに受け取ったデータを標準出力しています。

 そして、すべてのデータを通知し終えるとObservableは最後に完了(onCompleted)を通知します。Observableから完了が通知されると、ObserverはonCompletedメソッドを実行します。今回のサンプルでのonCompletedメソッドでは、実行されているスレッド名とともに"完了しました"と標準出力しています。そして、データを通知するObservableが正しく実装されている場合、このonCompletedメソッドが呼ばれた後は、他のonNextメソッドやonErrorメソッドが呼ばれることはありません。

 最後にObservableからエラーを通知もしくはエラーが発生した際にObserverのonErrorメソッドが呼ばれます。ObserverのonErrorメソッドの引数にはObservableで発生したエラーオブジェクト(Throwable)が渡されてきます。そして、Observerはこのエラーオブジェクトより判断して適切なエラー処理を行います。今回はスタックトレースの出力のみ行っています。onErrorメソッドもonCompletedメソッドと同様に、呼ばれた後はObserverの他のメソッドが呼ばれることはありません。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5