CodeZine(コードジン)

特集ページ一覧

RxJava 2.xで導入されたReactive Streams

RxJavaによるリアクティブプログラミング入門(3)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/10/11 14:00

目次

Reactive Streamsの構成

 Reactive Streamsの構成はRxJava 1.xの構成とかなり似ていて、生産者としてデータを生産し通知するPublisherと消費者としてデータを受け取り処理をするSubscriberとの関係で成り立っています。また、購読(subscribe)している生産者からのデータ数のリクエストや購読を解除するためのSubscriptionも存在し、Subscriptionを通して必要なデータ数のリクエストや購読を途中で解除することが可能になっています。

Reactive Streamsの構成

Reactive Streamsの構成

 このPublisherとSubscriberの関係を、前回説明したRxJava 1.xのObservableとObserverの関係と比べると次のようになります。

Reactive StreamsとRxJava 1.xの関係の比較
Reactive Streams RxJava 1.x 説明
Publisher Observable データを生産し通知するオブジェクト。
Subscriber Observer 受け取った通知をもとに処理を行うオブジェクト。RxJava 1.xではこのObserverを実装したSubscriberクラスが通知するデータ数をリクエストする機能も持つ。
Subscription Subscription 購読を解除することができるオブジェクトであり、Reactive Streamsでは通知するデータ数をリクエストする機能も持つオブジェクト。

 Reactive Streamsが提供するAPIの各インタフェースは次のように宣言されています。

Publisher.java
/** データを通知する生産者 */
public interface Publisher<T> {
  /** 通知を受け取るSubscriberを登録する */
  public void subscribe(Subscriber<? super T> subscriber);
}
Subscriber.java
/** データを受け取り処理をする消費者 */
public interface Subscriber<T> {
  /** 購読開始時の処理をする */
  public void onSubscribe(Subscription subscription);
  
  /** データの通知を受け取った際の処理をする */
  public void onNext(T item);
  
  /** エラーの通知を受け取った際の処理をする */
  public void onError(Throwable error);
  
  /** 完了の通知を受け取った際の処理をする */
  public void onComplete();
}
Subscription.java
/** 生産者と消費者をつなぐためのインタフェース */
public interface Subscription {
  /** 通知されるデータ数をリクエストする */
  public void request(long num);
  
  /** 購読をキャンセル(解除)する */
  public void cancel();
}

 Reactive Streamsの構成はRxJava 1.xと異なる部分もあり、まず、大きく異なる部分としてRxJava 1.xではSubscriptionがsubscribeメソッドの戻り値となっていたのに対し、Reactive Streamsでは、Subscriptionがデータの消費者であるSubscriberにonSubscribeメソッドの引数として渡されている点が違います。このSubscriberに新しく追加されたonSubscribeメソッドは、Publisherが購読された際に最初に通知されるイベントで一度だけしか呼ばれないものです。Reactive StreamsではSubscriptionがこのonSubscribeメソッド経由で渡ってきます。

 このSubscriptionはRxJava 1.xでは購読解除の機能しかありませんでしたが、Reactive Streamsではバックプレッシャーを実現するために、データ数をリクエストするrequestメソッドが追加されています。このrequestメソッドを通してSubscriberが処理できるデータ数をリクエストし、Publisherはリクエストされたデータ数分のデータをSubscriberに通知します。

 そのため、SubscriberはonSubscribeメソッド内でSubscriptionのrequestメソッドを呼んで最初に受け取るデータ数をリクエストします。そうすることで、Publsihserがデータの通知を開始できることになります。そして、Publsihserがリクエストした分のデータを通知した後は通知をやめ、次のリクエストが来るのを待ちます。ただし待っている間もデータを生産し続ける状態になっています。

 このように生産者がデータの通知や生成を行っている間、Subscriberは受け取ったデータを処理します。そして、リクエストした分のデータを処理した後に次に受け取るデータ数を再度リクエストします。これを繰り返すことで、Subscriberの処理ペースに合わせたデータの受け取りが可能になります。

 ちなみに、onNextメソッドでSubscriptionを使うには、onSubscribeメソッドで受け取ったSubscriptionをSubscriberの内部で保持しないといけません。

publisher.subscribe(new Subscriber<T>() {
  /** Subscriber内で保持するSubscription */
  private Subscription subscription;
  
  @Override
  public void onSubscribe(Subscription subscription) {
    // 受け取ったSubscriptionをSubscriber内に保持する
    this.subscription = subscription;
    
    // 最初に通知を受けるデータ数をリクエストする
    this.subscription.request(num);
    …略
  }
  
  @Override
  public void onNext(T item) {
    …略
    // リクエストした分のデータを処理したら次のデータ数をリクエストする
    subscription.request(num);
  }
  
  …略
});

 また、このsubscribeメソッドの戻り値がなくなり(voidになり)、onSubscribeメソッド経由でSubscriptionを受け取るようになったということは、RxJava 1.xでは購読解除を生産者と消費者の連携の外部から行っていたのに対し、Reactive StreamsではSubscriber内の閉じた環境で購読を解除する設計に変更されたことになります。

 まとめると、このPublisherとSubscriber間のプロトコルは次の4つになり、

  • onSubscribe
  • onNext
  • onError
  • onComplete

 正常系(すべてのデータを問題なく通知して完了した場合)の処理の流れは次のようになります。

Reactive Streamsの正常時のシーケンス図
Reactive Streamsの正常時のシーケンス図
  1. SubscriberがPublisherを購読(subscribe)する。
  2. PublisherがSubscriptionを生成し購読開始(onSubscribe)したことを通知する。
  3. Subscriberが受け取ったSubscriptionを通して最初に受け取るデータ数をリクエストする。
  4. Publisherがデータを通知する。
  5. Subscriberが受け取ったデータを処理する。
  6. Subscriberがリクエストしたデータ数分の処理を行ったら、再度通知するデータ数をリクエストする。
  7. Publsisherがすべてのデータを通知し終えたら、完了を通知する。
  8. Subscriberが完了の通知を受け取り処理をする。

 また、見逃しがちですが、完了の通知を受け取った際の処理を行うonCompleteメソッドの名前がRxJava 1.xと異なり、最後の「d」がなくなっていることに注意してください。

 その他にもReactive Streamsで名前が変わったメソッドにSubscriptionの「cancel」メソッドがあります。これはRxJava 1.xのときは「unsubscribe」と呼ばれていたメソッドで購読を解除する際に使われるメソッドです。

 その他にも、今回は説明しませんがProcessorというPublisherとSubscriberの両方を継承したインタフェースもあります。もし、Reactive Streamsについてさらに詳しく知りたい場合は、英語になりますがReactive Streamsの公式サイトやGitHubを参照してください。

Reactive Streamsのルール

 Reactive Streamsは、RxJavaから影響を受けているように多くのルールがRxJava 1.xと同じです。根本となるルールとして次のものがあります。

  • 通知はシーケンシャル(逐次的)に行われる。つまり、複数の通知を同時に行わない。
  • Publisherの処理は完了(onComplete)もしくはエラー(onError)を通知することで終了する。
  • 完了もしくはエラーを通知したらそれ以降は通知を行わない。

 ただし、RxJava 1.xとReactive Streamsとの間で異なる部分もあり、大きな違いの一つはnullが通知できなくなった点でしょう。Reactive Streamsではデータの通知やエラーの通知をする際にnullを通知しようとするとNullPointerExceptionを発生させる仕様になっています。

  • nullを通知することはできない。

 この変更はRxJava 1.xからの移行をする際に大きな影響がでる可能性があります。基本的にRxJava 1.xの場合でも対象外のデータを除外するfilterメソッドを使うなどしてnullをそのまま扱うことはあまりなかったとは思いますが、それでも生産者はfilterメソッドの前にnullを通知していることになります。そのためRxJava 1.xではObservableのオペレータなどで行っていたnullに対する除外処理を、RxJava 2.xでは生産者が通知する処理内で行うように変更しないといけなくなっています。

 さらにRxJavaでは、先のルールの一つである通知をシーケンシャルに行う点について、SubscriberのonSubscribeメソッドでrequestメソッドを呼んだタイミングでデータが通知されることもあり、SubscriberのonNextメソッドがonSubscribeメソッドの処理が終わる前に実行されることがあるので注意が必要です。

 さらに、これらに加えバックプレッシャーに関した次のルールも覚えておかなければなりません。

  • Publisherは最初にonSubscribeの通知を行う。
  • 購読したSubscriberは一度だけonSubscribeの通知を受け取る。
  • リクエストするデータ数が0以下の場合はIllegalArgumentExceptionをエラーとして通知する。
  • リクエストするデータ数がLong.MAX_VALUEの場合、通知するデータ数は無制限と見なすことができる。
  • データ数のリクエストはSubscriberのonSubscribeメソッドかonNextメソッドから呼ばれなければならない。
  • データ数のリクエストはSubscriptionが同じスレッド上もしくは同期がとられた状態で行わなければならない。

 これらの中で、リクエストするデータ数が0以下の場合に発生するIllegalArgumentExceptionはRxJavaだとSubscriberにonErrorとして通知されずに単にエラーのスタックトレースが出力されるだけなので注意してください。また、リクエストするデータ数がLong.MAX_VALUEの場合、RxJavaでは通知できるデータ数は無制限と見なされ、再度データ数をリクエストする必要がなくなり、Publisherは生成したすべてのデータを通知するようになります。

 またRxJavaの場合、バックプレッシャーのデータ数はリクエストを受けるたびに加算されるようになっています。つまり、リクエストしたデータ数が残っている段階でさらにデータ数をリクエストを受けた場合、残っているデータ数に対し新たにリクエストしたデータ数が加算されます。例えば最初に「データ数 = 100」とリクエストして1件のデータを処理した後にさらに「データ数 = 100」とリクエストした場合、通知するデータ数の上限は199となります。このことを意識していないと、意図せずリクエストしたデータ数がLong.MAX_VALUEになってしまい、無制限にデータを通知することになってしまうので注意が必要です。

 そして、購読をキャンセル際のルールもあり、次のようになっています。

  • 購読をキャンセルする場合はSubscriptionが同じスレッド上もしくは同期がとられた状態で行わなければならない。
  • キャンセルを呼び出した後は同じSubscriptionのrequestメソッドおよびcacnelメソッドは無視される。
  • キャンセル時の処理はスレッドセーフでなければならない。

  • LINEで送る
  • このエントリーをはてなブックマークに追加

修正履歴

  • 2016/10/31 18:23 「Reactive Streamsのルール」の項を更新。

  • 2016/10/24 15:56 3ページ目の「Reactive Streamsのルール」の解説を加筆修正しました。

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5