CodeZine(コードジン)

特集ページ一覧

RxJava 2.xで導入されたReactive Streams

RxJavaによるリアクティブプログラミング入門(3)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/10/11 14:00

目次

Reactve Streamsとは

 Reactive Streamsとは、前述したようにリアクティブプログラミングを行うための共通のインタフェースを提供するものです。Reactive Streamsは、このインタフェースを介して次の特徴を持つ仕組みを提供します。

  1. データストリーム(Data Stream)
  2. シーケンシャル(逐次的)な通知
  3. 非同期なデータの受け渡し
  4. バックプレッシャー(Backpressure)

 1から3に関しては、すでに前回までの連載で解説したRxJava 1.xにもあった機能なので今回は説明を省きますが、バックプレッシャーについては新しいコンセプトなので詳しく見ていきましょう。ちなみにバックプレッシャーの機能はRxJava 1.xにも含まれています。

バックプレッシャー(Backpressure)

 バックプレッシャーについて知るためには、まずデータを通知する生産者と受け取ったデータを処理する消費者がそれぞれ別のスレッド上で処理をしている状況で、生産者の処理スピードが速く、消費者の処理スピードが遅い場合を想像してください。

非同期なデータの通知
非同期なデータの通知

 この状況の場合、生産者がどんなにデータを通知しても消費者の処理が追い付かずに処理待ちのデータがどんどん貯まっていってしまいます。例えば、生産者が1秒間に100件のデータを通知するのに対し、消費者が1秒間に1件しかデータを処理できない場合、1秒後には99件のデータが残ることになり、さらに新たな100件のデータが加えられることになってしまいます。このような状況下では、時間が経つごとに処理待ちのデータが貯まっていってしまい、最新の結果を素早く受け取ることができなくなり、最終的にメモリーが足らなくなってシステムがクラッシュするなどの問題が発生する可能性があります。

 この問題を解決するのがバックプレッシャーで、生産者が通知するデータの量を抑制することで、消費者が受け取ったデータを処理しきれず大量のデータが処理待ちの状況になってしまうことを避けられるようにします。これを実現するために、消費者はデータを受け取る前に処理しきれるデータ数を生産者にリクエストし、生産者はそのデータ数分のデータを通知するようにします。

 バックプレッシャーの処理の流れは次のようになります。

  1. 消費者が指定したデータ数だけデータを通知するようにリクエストする
  2. 生産者がリクエストを受けたデータ数だけデータを通知する。
  3. 生産者はリクエスト分のデータを通知したら通知を止めるが、データの生産は続ける。
  4. 消費者が受け取った最後のデータを処理したら、再度データ数をリクエストする。
  5. リクエスト分のデータを再度通知し始める。

 このようにすべてのデータを通知し終えるまで、生産者はリクエストを受けた分だけのデータを通知し、消費者は処理しきれるデータ数をリクエストをすることを繰り返し続けます。また、購読を開始したタイミングで消費者がデータ数をリクエストすることで、生産者がデータの通知を始めることができるようになります。

バックプレッシャーのシーケンス図
バックプレッシャーのシーケンス図

 それでは生産者が生成しても通知ができていないデータはどうなるのでしょうか? これはバックプレッシャーの設定によりますが、新たなリクエストが来るまでに生成したデータを通知することなく破棄したり、次のリクエストが来るまで貯めておいたりできます。

データを破棄する場合
データを破棄する場合
データを貯める場合
データを貯める場合

 この他にも処理待ちのデータに対するバックプレッシャーのオプションはありますが、大まかに分類すると破棄するかバッファするかになります。ただし、処理されていないデータをバッファする場合、すべてのデータをバッファすると結局はバックプレッシャーがない場合と同じ問題に直面するため、バッファできるデータ数を指定しておくことが基本になります。

 RxJavaではバッファのサイズが指定でき、指定したバッファサイズを超えた場合にすぐエラーを通知することができます。そうすることで早い段階でエラーを返すことができ、通知できずバッファされているデータが過多になっていることが原因で問題が発生していることを素早く伝えることができます。また、データを破棄する場合もRxJavaでは内部のバッファサイズを超したものに対して破棄するようになっている点に注意が必要です。

 また、サンプルや図などでは分かりやすさを優先して少ないデータ数でのリクエストを行っていますが、少ないデータ数でのリクエストを頻繁にやり取りすることはオーバーヘッドになるので、実際には極端に小さい単位で頻繁にリクエストするのは避けた方がよいでしょう。

 ちなみにRxJava 1.xにもバックプレッシャーの機能は用意されています。すごく簡単に説明すると、Observerの実装クラスであるSubscriberからデータ数のリクエストを行い、Observableからリクエストしたデータ数分のデータを通知できるようになっています。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

修正履歴

  • 2016/10/31 18:23 「Reactive Streamsのルール」の項を更新。

  • 2016/10/24 15:56 3ページ目の「Reactive Streamsのルール」の解説を加筆修正しました。

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5