CodeZine(コードジン)

特集ページ一覧

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/11/09 14:00

目次

Observable(バックプレッシャー機能なし)を使った場合のサンプル

 RxJava 2.xのObservableとObserverの関係はFlowableとSubscriberの関係とほぼ同じですが、Reactive Streamsを実装しておらずバックプレッシャー機能がない点が異なっています。そして、クラスの構成や用意されているメソッドなどもバックプレッシャーに関わる機能を除いてFlowableとほぼ同じようになっているので、Flowableの使い方を知っていればObservableも同様に使うことができるようになります。それではObservableを使ったサンプルを見てみましょう。

 ここでのサンプルもFlowableのサンプルと同様に「Hello, World!」と「こんにちは、世界!」のデータを通知し、最後に完了したことを通知しています。そして通知を受け取るObserverはその通知の内容を出力するようにします。

Observableを使った場合
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するObservableの生成
  Observable<String> observable =
      Observable.create(new ObservableOnSubscribe<String>() {
        
        @Override
        public void subscribe(ObservableEmitter<String> emitter)
            throws Exception {
          
          // 通知するデータ
          String[] datas = { "Hello, World!", "こんにちは、世界!" };
          
          for (String data : datas) {
            // 解除されている場合は処理をやめる
            if (emitter.isDisposed()) {
              return;
            }
          
            // データを通知する
            emitter.onNext(data);
          }
          
          // 完了したことを通知する
          emitter.onComplete();
        }
      });
  
  observable
      // 消費する側の処理を別スレッドで行うようにする
      .observeOn(Schedulers.computation())
      // 購読する
      .subscribe(new Observer<String>() {
        
        // subscribeメソッドが呼ばれた際の処理
        @Override
        public void onSubscribe(Disposable disposable) {
          // 何もしない
        }
        
        // データを受け取った際の処理
        @Override
        public void onNext(String item) {
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + item);
        }
        
        // 完了を通知された際の処理
        @Override
        public void onComplete() {
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          System.out.println(threadName + ": 完了しました");
        }
        
        // エラーを通知された際の処理
        @Override
        public void onError(Throwable error) {
          error.printStackTrace();
        }
      });
  
  // しばらく待つ
  Thread.sleep(500L);
}
実行結果
RxComputationThreadPool-1: Hello, World!
RxComputationThreadPool-1: こんにちは、世界!
RxComputationThreadPool-1: 完了しました

 それでは、サンプルでは何をやっているのか見てみましょう。まずはデータを通知するObservableについて見ていきます。

Observable

 Observableが行っている処理は次のようになっています。

Observable.create(new ObservableOnSubscribe<String>() {  // ……(1)
  
  @Override
  public void subscribe(ObservableEmitter<String> emitter)  // ……(2)
      throws Exception {
    
    // 通知するデータ
    String[] datas = { "Hello, World!", "こんにちは、世界!" };
    
    for (String data : datas) {
      // 解除されている場合は処理をやめる
      if (emitter.isDisposed()) {  // ……(3)
        return;
      }
    
      // データを通知する
      emitter.onNext(data);
    }
    
    // 完了したことを通知する
    emitter.onComplete();
  }
}); // ……(4)

 このObservableとFlowableの構成上の違いはほぼなく、(1)でFlowableOnSubscribeの代わりにObservableOnSubscribeを受け取るようになっています。

 それに伴い(2)で見て分かるように、subscribeメソッドの引数でFlowableEmitterの代わりにObservableEmitterを受け取るようになっています。

 また(3)では、購読が解除されたのかどうかを確認するメソッドはFlowableEmitterの場合はisCanceledメソッドだったのに対し、ObservableEmitterの場合はisDisposedメソッドになり名前が異なっている点に注意してください。ただし、処理内容は両方とも購読の解除を行います。

 さらに(4)では、バックプレッシャーがなくなったのでcreateメソッドの引数は1つになり、Flowableのcreateメソッドのようにバックプレッシャーのオプションを指定する第2引数がなくなった点が異っています。そしてバックプレッシャーがないということは、データが生産されるたびにデータが通知されることになります。

 このようにFlowableとの違いはいくつかありますが、ObservableEmitterの実装クラスはFlowableEmitterと同様に購読が解除されていたら通知を行わないようになっていたり、nullを通知するとNullPointerExceptionを発生させたりと、バックプレッシャーの機能がないことを除けばFlowableの場合とほぼ同じになります。

Observer

 Observerは通知を受け取り、その通知に対する処理を行います。ObserverはRxJava 1.xからありますが、RxJava 2.xではonSubscribeメソッドが追加されています。このメソッドはFlowableを購読するSubscriberのonSubscribeメソッド同様に購読が開始する際に呼ばれるメソッドですが、ObservableとObserver間ではバックプレッシャーの機能はなくなっているので引数にSubscriptionではなくDisposableを受け取るようになっています。それではObserverの中身を見ていきましょう。

new Observer<String>() {
  
  // 購読が開始された際の処理
  @Override
  public void onSubscribe(Disposable disposable) {
    // 何もしない……(1)
  }
  
  // データを受け取った際の処理
  @Override
  public void onNext(String item) {
    // 実行しているThread名の取得
    String threadName = Thread.currentThread().getName();
    // 受け取ったデータを出力する
    System.out.println(threadName + ": " + item);
    
    // 通知するデータ数のリクエストは行わない……(2)
  }
  
  …… 略
}

 (1)を見て分かるように、このサンプルでは購読開始時に処理を行うonSubscribeメソッドでDisposableを受け取っても、特に何もしていません。もし、購読の途中で購読を解除する必要がある場合は受け取ったDisposableをObserver内で保持しないといけませんが、今回のサンプルでは購読解除しないので何もしていません。

 さらに(2)を見て分かるように、バックプレッシャー機能がないのでデータを受け取った際の処理のみ行っています。仮に何らかのタイミングで購読を解除しないといけない場合に(1)のonSubscribeメソッドで受け取ったDisposableをObserver内で保持し、そのDisposableを使ってonNextメソッド内で購読を解除するためのdisposeメソッドを呼ぶことになります。

 また、Subscriberと同様にObserverにもResourceObserverとDisposableObserverが用意されており、基本的には次のメソッドのみ実装すればよくなります。

  1. onNext(T data)
  2. onError(Throwable error)
  3. onComplete()

 さらにResourceObserverとDisposableObserverも同様にonSubscribeメソッドがfinalでオーバーライドされており、直接Disposableにアクセスできないようになっています。代わりに購読を解除する際の次のメソッドが用意されています。

Disposableの機能を呼び出すメソッド
メソッド 説明
dispose() 購読を解除する。
isDisposed() 購読が解除されていればtrueを返す。

 そして購読開始時の処理としてonStartメソッドが用意されており、購読開始時に何か行う場合はこのメソッドをオーバーライドします。デフォルトでは次のように何も処理を行わないメソッドになっています。

ResourceObserver/DisposableObserverのonStartメソッドの実装
protected void onStart() { }

subscribeメソッド

 RxJava 2.xのObserverを引数に受け取るsubscribeメソッドは戻り値を返さないようになっています。そのため、このsubscribeメソッドを呼んだ場合はObserverの内部で購読の解除を行うような設計になっています。

 ただし、Flowableの場合と同様に、各通知時の関数型インターフェースを受け取るsubscribeメソッドや戻り値を返すsubscribeWithメソッドが用意されており、このようなsubscribeメソッドの場合は戻り値にDisposableを返します。そして、この戻り値のDisposableを使って、Observerの外から購読を解除することが可能になります。

まとめ

 今回はRxJava 2.xを使った簡単な実装を行い、RxJavaの基本的な仕組みを実装ベースで見てきました。次回はRxJavaについて何ができるのかについて、もう少し踏み込んで見ていきます。



  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5