CodeZine(コードジン)

特集ページ一覧

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/11/09 14:00

目次

subscribeメソッド

 RxJava 2.xのSubscriberを引数に受け取るsubscribeメソッドは、戻り値を返さないようにRxJava 1.xから変更されています。これはReactive Streamsの仕様に沿ったもので、このsubscribeメソッドを呼んだ場合は、Subscriberの内部で購読の解除を行うような設計になっています。

 例えば先ほどのサンプルを使って、購読を開始してから500ミリ秒以上経過していたら購読を解除するような処理を行うようにしてみましょう。その場合、次のようにSubscriberのonNextメソッドでdisposeメソッドを呼びます。

途中で購読を解除するサンプル
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するFlowableの生成
  Flowable<String> flowable = Flowable.create(emitter -> {
    …… 略
  }, BackpressureStrategy.BUFFER); // 超過したデータはバッファする
  
  flowable
      // Subscriberの処理を別スレッドで行うようにする
      .observeOn(Schedulers.computation())
      // 購読する
      .subscribe(new ResourceSubscriber<String>() {
        
        /** 購読の開始時間 */
        private long startTime;
        
        // 購読が開始された際の処理
        @Override
        protected void onStart() {
          // 購読の開始時間を取得
          startTime = System.currentTimeMillis();
          // データ数のリクエスト
          request(Long.MAX_VALUE);
        }
        
        // データを受け取った際の処理
        @Override
        public void onNext(String data) {
          // 購読開始から500ミリ秒を過ぎた場合は購読を解除する
          if ((System.currentTimeMillis() - startTime) > 500L) {
            dispose();  //  購読を解除する
            System.out.println("購読解除しました");
            return;
          }
          
          // 重い処理をしているとみなして1000ミリ秒待機 
          // ※ 本来はThread.sleepは呼ぶべきではない
          try {
            Thread.sleep(1000L);
          } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
          }
          
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + data);
        }
        
        …… 略
      });
  
  // しばらく待つ
  Thread.sleep(1500L);
}

 このサンプルでは生産者と消費者の処理スピードのギャップを作るため、SubscriberのonNextメソッドでスレッドをいったん停止しています。この箇所はそこで何かの重い処理をしていると考えてください。これを実行すると次のような結果が出力されます。

実行結果
RxComputationThreadPool-1: Hello, World!
購読解除しました

 ※非同期処理のため実行するたびに結果は変わる可能性があります。

 この時のSubscriberのonNextメソッドの処理の流れは次のようになっています。

  1. データ「Hello, World!」を受け取る。
  2. 購読開始から500ミリ秒より時間が経過しているかチェックする。
  3. 経過していないので1000ミリ秒待機し受け取ったデータを出力する。
  4. データ「こんにちは、世界!」を受け取る。
  5. 購読開始から500ミリ秒より時間が経過しているかチェックする。
  6. 経過しているので購読を解除し、onNextメソッドの処理を止める。
  7. 購読が解除されているので完了の通知が来なくなる。

 ここで特に認識しておくべき点は、500ミリ秒を経過したら購読を解除しているのではなく、onNextメソッドが呼ばれたタイミングで500ミリ秒を経過したかの判断をしている点です。つまり、500ミリ秒をすぎていてもonNextメソッドが呼ばれなければ購読が解除されることはありません。

 また、subscribeメソッドにはこのReactive Streamsの仕様に沿ったものの他に、Disposableを戻り値に返す関数型インターフェースを引数に取るsubscribeメソッドも用意しています。この引数となる関数型インターフェースには各通知時の処理が定義されています。そして、このsubscribeメソッドを使う場合は、戻り値のDisposableを使ってSubscriberの外から購読を解除することが可能になります。

Disposableを使って購読を解除する例
…… 略

// 購読を開始する……(1)
Disposable disposable =
    flowable.subscribe(data -> System.out.println("data=" + data));

…… 略

// 購読を解除する……(2)
disposable.dispose();

…… 略
  1. 関数型インターフェースを引数に取るsubscribeメソッドを使い、戻り値としてDisposableを取得する。
  2. Disposableのdisposeメソッドを呼び出し購読を解除する。

 このようにRxJavaでは、引数なしや引数が関数型インターフェースのsubscribeメソッドが用意されており、Disposableを返すものが用意されています。このようなsubscribeメソッドは次のものになります。

Disposableを返すsubscribeメソッド
戻り値の型 メソッド 説明
Disposable subscribe() Flowableの処理だけ行いSubscriberは何もしない。
Disposable subscribe(Consumer<? super T> onNext) データの通知(onNext)を受け取った時の処理のみ引数で定義してあるように行う。
Disposable subscribe(Consumer<? super T>onNext, Consumer<? super Throwable> onError) データの通知(onNext)とエラーの通知(onError)を受け取った時の処理のみ引数で定義してあるように行う。
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) データの通知(onNext)とエラーの通知(onError)と完了の通知(onComplete)を受け取った時の処理のみ引数で定義してあるように行う。
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) データの通知(onNext)とエラーの通知(onError)と完了の通知(onComplete)を受け取った時の処理を引数で定義してあるように行い、さらに購読開始時の処理(onSubscribe)も定義してあるように行う。

 これらのsubscribeメソッドでは、デフォルトでリクエストするデータ数にはLong.MAX_VALUEが設定されています(引数にonSubscribe時の関数型インターフェースを取るものを除く)。そのためFlowableの通知に対しデータ数の制限はなくなり、再度データ数をリクエストする必要はなくなります。

 さらにRxJava 2.xではこれらのsubscribeメソッドに加え、新たに購読を行うためのメソッドとして、Subscriberを引数に取り、戻り値も返すsubscribeWithメソッドを用意しています。このメソッドの実装は次のようになっており、引数にSubscriberを渡すと内部でそのSubscriberをsubscribeメソッドに渡して実行し、戻り値としてその引数を返すようになっています。

subscriberWithメソッドの実装
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
  subscribe(subscriber);
  return subscriber;
}

 これは一見Subscriberを返してどうするのかと思うかもしれませんが、引数にResourceSubscriberやDisposableSubscriberなどのDisposableを実装しているSubscribeを渡すことで戻り値としてDisposableを受け取ることができます。

subscriberWithメソッドを使ってDisposableを戻り値を取得する例
Disposable disposable =
    flowable.subscribeWith(new ResourceSubscriber(){
      …… 略
    });

 こうすることでRxJava 1.xのように購読後に購読解除が行えるDisposableを取得し、Subscriberの外部から購読の解除ができるようになります。また、外部からdisoposeメソッドが呼ばれるということは非同期で呼ばれる可能性もあるということです。ResourceSubscriberやDisposableSubscriberなどのRxJavaで実装されたSubscriberのクラスは、非同期で呼ばれることにも対応しているので、subscribeWithメソッドを使う際は、これらのRxJava側で実装されたクラスを使う方が良いでしょう。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5