CodeZine(コードジン)

特集ページ一覧

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/11/09 14:00

目次

Subscriber

 RxJava 2.xのSubscriberは、通知されたデータを受け取り、そのデータを使って何らかの処理を行うインターフェースです。さらにRxJava 2.xのSubscriberは受け取れるデータ数をリクエストする責任も持っており、このデータ数のリクエストを行わないとFlowableはデータを通知することはできません。これはReactive Streamsの仕様であり、そのため、このインターフェースはRxJavaのものではなくReactive Streamsのものになります。そして、このSubscriberのパッケージはorg.reactivestreamsです。

 今回のサンプルでは、前のRxJava 1.xを使ったサンプルと同様に、通知を受け取ったら実行されるスレッド名とともに標準出力していますが、さらにSubscriberが1件のデータの処理を終えるまで次のデータをSubscriberに通知しないようにFlowableの通知の制御(バックプレッシャー)を行っています。このときは通知するデータ数のリクエストはSubscriptionインターフェース経由で行います。そして、このSubscriptionもReactive Streamsのものであり、パッケージはorg.reactivestreamsになります。

Subscriber
new Subscriber<String>() {
  
  /** データ数のリクエストおよび購読の解除を行うオブジェクト */
  private Subscription subscription;
  
  // 購読が開始された際の処理
  @Override
  public void onSubscribe(Subscription subscription) { // ……(1)
    // SubscriptionをSubscriber内で保持する ……(2)
    this.subscription = subscription;
    // 受け取るデータ数をリクエストする ……(3)
    this.subscription.request(1L);
  }
  
  // データを受け取った際の処理
  @Override
  public void onNext(String item) {
    …… 略(受け取ったデータを使った処理をする)
    
    // 次に受け取るデータ数をリクエストする ……(4)
    this.subscription.request(1L);
  }
  
  …… 略
}

 (1)を見て分かるように、RxJava 2.xで使われているSubscriberは、RxJava 1.xのObserverと異なり新たにonSubscribeメソッドが追加されています。このメソッドは購読が開始すると最初に呼ばれるメソッドであり、引数からSubscriptionを受け取ります。このSubscriptionは通知するデータ数のリクエストおよび購読の解除を行うReactive Streamsのインターフェースです。

 (2)では、onSubscribeメソッドの引数であるSubscriptionをSubscriber内で保持するために、受け取った引数をSubscriberのインスタンス変数に設定しています。こうすることでonSubscribeメソッドが終了してもSubscriptionを使えるようにしています。

 そして(3)で、Subscriptionのrequestメソッドを使って最初に通知されるデータ数をリクエストすることで、初めてデータの通知が開始されます。今回は最初にデータを1件だけ通知するようにリクエストしています。もし、このonSubscribeメソッド内でデータ数のリクエストを行っていないと、Flowableが通知するデータ数のリクエストが来るのを待っている状態になり、通知を始めることができなくなるので注意が必要です。

 また、Subscriptionのrequestメソッドの引数には1以上の数値しか受け付けず、Reactive Streamsの仕様と異なり、0以下の数値を与えてもログの出力だけをして、このリクエストを無視します。さらに、引数の上限はLong.MAX_VALUEになっており、Long.MAX_VALUEを設定すると無制限にデータを受け取るように挙動が変わります。つまり、Long.MAX_VALUEを渡すとデータ数の制限がなくなり、今後データ数のリクエストがなくてもデータを受け取り続けることになります。

 

 さらに、ここで重要なのが、requestメソッドの呼び出しをonSubscribeメソッドの最後で行っていることです。Flowableによってはrequestメソッドの呼び出しと同時にデータの通知を始めるものもあります。そのようなFlowableの場合、reuqestメソッド以降に記述してあるコードが、完了の通知を行うまで実行されなくなる可能性があります。そのためonSubscribeメソッド内でrequestメソッドを呼ぶ際は、最後に行うようにしてください。

 (4)では、onNextメソッドでデータを受け取った際の処理を行い、最後に次のデータを1件だけ通知するようにリクエストしています。そうすることで、次にデータを受け取ってもまた通知するデータ数をリクエストするので、生産者がすべてのデータを通知するまで繰り返しデータを受け取ることができるようになります。もし、次のデータ数をリクエストしていないと、最初にリクエストした分のデータを通知した後、それ以降データが通知されなくなるので注意が必要です。

 バックプレッシャーを効かせたデータの通知はこのようになります。ただし、データ数をリクエストし、リクエストした分のデータを処理した後に再度データ数をリクエストするようなことを繰り返し行うと、何らかの問題が発生してデータ数のリクエストがされなくなり、消費者がデータを受け取れる状況にもかかわらず、生産者がデータの通知待ち状態になってしまうリスクがあります。また、生産者と消費者での処理スピードのギャップがあまりない場合や通知するデータ数が多くない場合など、特に通知するデータ数を制限する必要がない場合も多くあります。このような場合、onSubscribeメソッド内で実行するrequestメソッドにLong.MAX_VALUEを指定することで、生産者はデータ数の制限なくデータを通知できるようになり、このようなリスクを減らすことができることになります。

データ数での制限をさせない場合
new Subscriber<String>() {
  
  …… 略
  
  @Override
  public void onSubscribe(Subscription subscription) {
    …… 略
    // データ数を制限することなくデータを通知するようにリクエストする
    subscription.request(Long.MAX_VALUE);
  }

  @Override
  public void onNext(String item) {
    受け取ったデータの処理のみ行い、データ数のリクエストは行わない
  }
  
  …… 略
}

 また、このサンプルでは取り扱いませんでしたが、onSubscribeメソッドで受け取るSubscriptionは購読を解除する機能も持っています。もしSubscriberが購読を途中で解除する必要がある場合、Subscriptionのcancelメソッドを呼ぶことで通知処理を終わらせるように促すことができるようになります。

 さらにRxJavaでは、onSubscribeメソッドでLong.MAX_VALUEをリクエストすることを内部で行うResourceSubscriberクラスが用意されています。このResourceSubscriberは抽象クラスであり、基本的にはRxJava 1.xのObserverと同様に次のメソッドのみ実装すればよくなり、データ数のリクエストを忘れることを防げます。

  • onNext(T data)
  • onError(Throwable error)
  • onComplete()

 また、ResourceSubscriberではonSubscribeメソッドの実装がfinalで定義されていて、受け取ったSubscriptionが隠蔽されています。そのため、ResourceSubscriberを使う場合は直接Subscriptionにアクセスすることはできないようになっています。

 そのため、onSubscribeメソッドをオーバーライドすることは不可能なのですが、onSubscribeメソッド内から呼ばれているResourceSubscriberのonStartメソッドをオーバーライドすることで初期時のリクエストを行うことができます。ResourceSubscriberでは次のように実装されています。

ResourceSubscriberのonStartメソッドの実装
protected void onStart() {
    request(Long.MAX_VALUE);
}

 このonStartメソッドで呼ばれているrequestメソッドは、ResourceSubscriberの内部でSubscriptionを経由してデータ数をリクエストできるようにするメソッドで、ResourceSubscriberを使う場合は、このrequestメソッドを呼び出してデータ数のリクエストを行うことができるようになります。

 また、onStartメソッドでもonSubscribeメソッドと同様にrequestメソッドの呼び出しと同時にデータの通知を始め、reuqestメソッド以降に記述してあるコードが、完了の通知を行うまで実行されなくなる可能性があるため、onStartメソッド内で最後にrequestメソッドを呼ぶようにしてください。

 さらにResourceSubscriberはDisposableを実装しているため、購読を解除するdisposeメソッドも実装しています。このdisposeメソッドは内部でSubscriptionのcancelメソッドを呼んでいるので、Subscriptionに直接アクセスできなくてもdisposeメソッド経由で購読の解除を行うことを可能にしています。

Subscriptionの機能を呼び出すメソッド
メソッド 説明
request(long) 通知するデータ数をリクエストする。
dispose() 購読を解除する。Subscriptionのcancelメソッドが呼ばれる。

 また、RxJavaではResourceSubscriberと同じようなSubscriberとしてDisposableSubscriberが用意されています。このDisposableSubscriberもonSubscribeメソッドをfinalでオーバーライドして、データ数をLong.MAX_VALUEでリクエストし、Subscriptionを隠蔽しています。このDisposableSubscriberの使い方はResourceSubscriberとほぼ同じになります。今回は特に解説しませんが、両者の主な違いはResourceSubscriberが他のDisposableを保持することができるのに対し、DisposableSubscriberは他のDisposableを保持できない点にあります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5