CodeZine(コードジン)

特集ページ一覧

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/11/09 14:00

目次

Flowable

 Flowableが行っている処理は次のようになっています。

// あいさつの言葉を通知するFlowableの生成
Flowable<String> flowable =
    Flowable.create(new FlowableOnSubscribe<String>() { // ……(1)
      
      @Override
      public void subscribe(FlowableEmitter<String> emitter)
          throws Exception {  // ……(2)
        
        String[] datas = { "Hello, World!", "こんにちは、世界!" };
        
        for (String data : datas) {
          // 購読が解除された場合は処理をやめる……(3)
          if (emitter.isCancelled()) {
            return;
          }
          
          // データを通知する……(4)
          emitter.onNext(data);
        }
        
        // 完了したことを通知する……(5)
        emitter.onComplete();
      }
    }, BackpressureStrategy.BUFFER);  // 超過したデータはバッファする……(6)

 (1)を見て分かるように、実際にデータを通知する処理はcreateメソッドの引数であるFlowableOnSubscribeインターフェースのsubscribeメソッドで行われています。そしてこのsubscribeメソッドの引数であるFlowableEmitterインターフェースを通じてSubscriberに通知を行っています。

 このFlowableEmitterは通知メソッド(onNextメソッド、onErrorメソッド、onCompleteメソッド)の内部で購読が解除されたか確認するようになっています。そのため、RxJava 1.xの通知メソッドと異なり、購読が解除されている状態でFlowableEmitterのonNextメソッドなどの通知メソッドを呼んでも、Subscriberには通知が行われないようになっています。

 また、FlowableOnSubscribeは関数型インターフェースであるため、ラムダ式を使って次のように記述することも可能です。

// あいさつの言葉を通知するFlowableの生成
Flowable<String> flowable = Flowable.create(emitter -> {
   String[] datas = { "Hello, World!", "こんにちは、世界!" };
  …… 略
}, BackpressureStrategy.BUFFER);

 さらに(2)を見てみると、subscribeメソッドがRxJava 1.xと異なりExceptionをthrowするようになっています。このことにより処理中に例外をcatchする必要がなくなり、発生した例外はFlowable内部の呼び出し元でcatchされ、致命的なエラーでない限りはSubscriberにエラーの通知を行うようになっています。

FlowableCreate.javaの例外処理のための実装部分
public final class FlowableCreate<T> extends Flowable<T> {
  
  final FlowableOnSubscribe<T> source;
  
  …… 略
  
  // subscribeメソッド内で呼ばれるメソッド
  @Override
  public void subscribeActual(Subscriber<? super T> subscriber) {
    BaseEmitter emitter;
    …… 略
    try {
      source.subscribe(emitter);  // 実装したcreateメソッド内の処理を行う
    } catch (Throwable ex) {
      Exceptions.throwIfFatal(ex);
      emitter.onError(ex);
    }
    …… 略
  }
  …… 略
}

 一応、createメソッド内でエラーの通知を明示的に行わなくても、エラーが発生した場合はSubscriberに通知されますが、FlowableEmitterのonErrorメソッドを使って明示的にエラーを通知することも可能です。ただし、その場合は速やかに処理をやめ、エラー通知後に何も他の通知が行われないように注意しないといけません。

 (3)では、購読が解除された場合はそれ以上処理を行わないようにしています。RxJava 2.xでは購読が解除された場合、onNextメソッドなどの通知メソッドを呼び出しても、通知を行わないようになっていますが、createメソッドを使って実装している場合、購読が解除された後もFlowableの処理を続けるかどうかは実装者の責任になります。つまり、RxJavaがしてくれるのは通知メソッドを呼び出しても通知を行わないことだけで、処理自体を止めてくれるわけではありません。そのため、createメソッドを使って大量のデータをループしながら通知するFlowableを生成した場合や、完了することなく永遠にデータを通知するようなFlowableを生成した場合、購読解除された際の処理を自分で実装していないと処理が実行され続け無駄にリソースを費やすことになります。

 続いて(4)では、FlowableEmitterのonNextメソッドに引数にデータを渡すことで、そのデータをSubscriberに通知しています。ただし、この時に注意すべき点として、RxJava 1.xと異なりRxJava 2.xからはnullが通知できなくなったことがあります。もしonNextメソッドの引数にnullを渡した場合、NullPointerExceptionが発生します。

 (5)では、すべてのデータを通知した後にFlowableEmitterのonCompleteメソッドを呼ぶことでSubscriberにFlowableの処理が完了したことを通知しています。このメソッドも購読が解除されている場合は通知が行われないように挙動がRxJava 1.xから変更されており、さらにメソッド名もRxJava 1.xから変更され最後の「d」がなくなっています。そして、完了の通知をした後は他の通知をすべきではないので、onCompleteメソッドを呼んだ後は何もしていないこともポイントの一つになります。

 最後に(6)ではFlowableのcreateメソッドの第2引数にどのようなバックプレッシャーを行うのかのオプションを指定する列挙型のBackpressureStrategyを指定します。このBackpressureStrategyは2.0.0-RC4までBackpressureModeと呼ばれていたもので、2.0.0-RC5よりBackpressureStrategyにマージされました。今回は生成されたデータを通知するまですべて貯めておく(バッファする)BackpressureStrategy.BUFFERを設定しています。このモードは、Flowableによるデータの生成スピードがSubscriberによるデータの処理スピードより速い場合、通知できずに通知待ちとなったデータをすべてバッファし、通知できるようになるまで保持するモードです。

 BackpressureStrategyは、この他にも通知待ちのデータを破棄するBackpressureStrategy.DROPや、すべてではなく最新のデータのみ保持するBackpressureStrategy.LATESTなどがあります。また、RxJavaでは内部で通知待ちのデータをバッファするようになっており、そのバッファサイズを調整して何件までデータを保持するのかを決めています。そのため、どのデータが通知され、どのデータが破棄されるのかはこのバッファサイズが影響することになります。

BackpressureStrategy
BackpressureStrategy 説明
BUFFER 通知されるまで、すべてのデータをバッファする。
DROP データを通知できるようになるまで、新たに生成されたデータを破棄する。
LATEST 生成した最新のデータのみをバッファし、生成されるたびにバッファするデータを置き換える。
ERROR 通知待ちのデータがバッファサイズを超す場合はMissingBackpressureExceptionのエラーを通知する。
MISSING 特定の処理を行わない。主にonBackpressureで始まるメソッドを使ってバックプレッシャーのモードを設定する場合に使われる。

 また、今回のサンプルでは使わなかったのですが、FlowableEmitterには購読が解除された際に指定した処理を行うsetCancellableメソッドや、他のDisposableを設定してまとめて購読解除するためのsetDisposableメソッドもあります。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5