CodeZine(コードジン)

特集ページ一覧

RxJava 2.xで導入されたReactive Streams

RxJavaによるリアクティブプログラミング入門(3)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/10/11 14:00

目次

RxJava 2.xでのReactive Streams対応とRxJava 1.xからの変更

 前に述べたように、RxJavaが2.xにバージョンアップする目的の一つはReactive Streamsの対応です。そのためRxJava 1.xでは他のライブラリの依存はありませんでしたが、RxJava 2.xではReactive Streamsが提供するAPIに依存することになります。

 また、RxJava 2.xへのバージョンアップによる大きな変更に一つに、生産者と消費者の構成がRxJava 1.xではObservableとObserverの構成の一つだったのがRxJavaでは2つのグループに分割されたことがあります。このグループの一つはReactive Streamsの仕様を実装したFlowableとSubscriberの構成であり、もう一つはバックプレッシャーの機能がないObservableとObserverの関係です。

 FlowableはReactive Streamsの生産者であるPublisherの実装クラスで、SubscriberはReactiveStreamsのものになります。それに対しRxJava 2.xのObservableはReactive Streamsの実装を行っておらず、Reactive StreamsのAPIと連携することはまったくありません。

RxJava 1.xからRxJava 2.xの遷移
RxJava 1.xからRxJava 2.xの遷移

 FlowableとSubscriberの構成はReactive StreamsのAPIを使っており、Publisherの実装クラスがFlowableになっています。

Flowable-Subscriberの構成

Flowable-Subscriberの構成

 これに対し、ObservableとObserverの構成はFlowableとSubscriberの構成と同じような作りになっていますが、Reactive Streamsの対応は行っておらずバックプレッシャー機能がない構成となっています。そのため、FlowableとSubscriberの構成で使われているSubscriptionの代わりにDisposableという購読を破棄するための機能しか持っていないインタフェースを扱います。

Observable-Observerの構成

Observable-Observerの構成

 このDisposableは次のようになっており、disposeメソッドがSubscriberのcancelメソッドに相当し購読を破棄するために使われます。

/** 購読を解除するためのインタフェース */
public interface Disposable {
  /** 購読を破棄する */
  void dispose();
  
  /** すでに購読が破棄されているならtrueを返す */
  boolean isDisposed();
}

 RxJava 1.xからあるObservableとObserverなのですが、RxJava 2.xからはAPIも若干変更され、Flowable同様にObservableのsubscribeメソッドには戻り値がなくなっています。さらにSubscriber同様にObserverは新たにonSubscribeメソッドが追加されています。

 そして、前回は紹介していなかったのですが、RxJava 1.xのObservableとObserverの構成にもバックプレッシャーの機能は持っていたのですが、RxJava 2.xからはRxJava 1.xにあった「onBackpressure」で始まるメソッドなどのバックプレッシャーの機能が削除されています。

 このようにRxJava 2.xでは、Reactive Streamsに対応してバックプレッシャーの機能を持つFlowableの構成と、Reactive Streamsに対応せずバックプレッシャーの機能がないObservableの構成とに分割されたことがRxJava 1.xからの大きな構成の変化になります。そして、SubscriberやObserverを引数に取るsubscribeメソッドの戻り値がなくなったことにより、購読を途中でやめるにはSubscriberやObserverの内部でSubscriptionやDisposableを使って行うようにデザインが変更されています。

RxJava独自のsubscribeメソッド

 それでは、データが通知された際の処理しか必要ないような場合はどうなるのでしょうか? 完了時やエラー時に何もしないSubscriberを毎回生成しないといけないのでしょうか? この問題に対しRxJava 1.xでは、データ通知時の処理を行う関数型インタフェースを引数に持つsubscribeメソッドを提供してきました。それではRxJava 2.xではどうなのでしょうか?

 実はRxJava 2.xでも関数型インタフェースを受け取るsubscribeメソッドは用意されています。しかし、SusbcriberやObserverを引数に持つsubscribeメソッドのように戻り値がないようにすると、購読を途中でやめる手段がなくなってしまいます。そのため、関数型インタフェースを引数に取るsubscribeメソッドでは戻り値としてDisposableを返すようになっています。

// onNext時の処理(受け取ったデータを出力する)だけの例
Disposable disposable = flowable.subscribe(System.out::println);

 このDisposableは購読をやめさせる機能を持つインタフェースでdisposeメソッドを呼ぶことで処理を開始したFloawableやObservableに対して処理をやめさせられることができるようになります。また、このDisposableのdisposeメソッドはSubscriptionのcancelメソッドと同等で、実際にFlowableのsubscribeメソッドから生成されたDisposableのdisposeメソッドを呼ぶと内部でSubscriptionのcancelメソッドを呼ぶようになっています。

 また、Flowableで関数型インタフェースを受け取るsubscribeメソッドを使った場合、デフォルトではonSubscribe時にLong.MAX_VALUEのリクエストがされます。そのため、通知するデータ数の制限がなくなっているため、次のデータ数をリクエストをする必要がありません。

まとめ

 今回はRxJava 2.xが対応することにしたReactive Stremasについて見ていき、さらにRxJava 2.xがReactive Streamsをどのように対応し、それに伴いRxJava 1.xからどのように変わったのかについて見てきました。次回は、前回実装したRxJava 1.xのサンプルをRxJava 2.xではどのように実装するのか見ていきます。



  • LINEで送る
  • このエントリーをはてなブックマークに追加

修正履歴

  • 2016/10/31 18:23 「Reactive Streamsのルール」の項を更新。

  • 2016/10/24 15:56 3ページ目の「Reactive Streamsのルール」の解説を加筆修正しました。

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5