SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

RxJavaによるリアクティブプログラミング入門

RxJava(2.x)の便利なオペレータ(結合/ユーティリティ/デバッグ)

RxJavaによるリアクティブプログラミング入門(6)

  • X ポスト
  • このエントリーをはてなブックマークに追加

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回はRxJava 2.xの最も基本となるオペレータについて解説しました。今回は前回紹介していない複数のFlowable/Observableを結合するオペレータとユーティリティ系のオペレータ、そしてデバッグ用(ログ出力)に使えるオペレータの中でも代表的なものについて見ていきます。今回もサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、データを受け取るSubscriberとして、特別なことをしない限り、第5回で作成したDebugSubscriberを使用しています。

  • X ポスト
  • このエントリーをはてなブックマークに追加

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

複数のFlowable/Observableを結合するオペレータ(1)

 複数のFlowable/Observableを結合するオペレータは大きく分けて2つの種類があり、一つは単に結合してそれぞれのデータを通知するもの、もう一つは各Flowable/Observableが通知したデータを組み合わせて新しいデータを生成してから通知するものです。ここではこれらの代表的なオペレータとして次のものを見ていきます。

  • merge
  • combineLatest
  • zip

 この他にも複数のFlowable/Observableを結合するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

merge

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2)
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3)
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3, Publisher/ObservableSource<? extends T> source4)

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 mergeメソッドは複数のFlowable/Observableが通知するデータを1つのFlowable/Observableを通して通知するオペレータです。このメソッドを使うことで、複数のFlowable/Observableの通知を1つのSubscriber/Observerに購読させることができるようになります。処理を開始する際は引数のFlowable/Observableを同時に実行させ、それぞれがデータを通知するタイミングで結果のFlowable/Observableからデータが通知されます。ただし、複数のFlowable/Observableが同時にデータを通知しても、結果として通知する際は同期が取られシーケンシャルに通知されるようになっています。そして、完了を通知するタイミングは引数の全てのFlowable/Observableが完了する時になります。mergeメソッドは引数に最大4つまでのFlowable/Observableを取りますが、それより多くのFlowable/Observableを結合したい場合は、引数を配列で取るmergeArrayメソッドを使うことで対応することができます。

 ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、mergeDelayErrorメソッドを使うことで対応することができます。

サンプル

 次のサンプルではintervalメソッドで生成した2つのFlowableについて、mergeメソッドを使って1つのFlowableにまとめ、そのFlowableを経由してデータを通知するようにしています。また、引数となるそれぞれのFlowableは通知する間隔やデータ数が異なるようになっています。このmergeメソッドで生成されたFlowableを実行すると引数のFlowableを全て同時に実行され、引数のFlowableを持つデータがmergeメソッドの結果となるFlowableから通知されることになります。

merge(source1, source2)のサンプル
public static void main(String[] args) throws Exception {
  // マージ対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する 
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // マージ対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 2件まで
          .take(2)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableをマージする
  Flowable<Long> result = Flowable.merge(flowable1, flowable2); // (3)
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って2件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」を通知するFlowableになる
  3. mergeメソッドを使って(1)と(2)のFlowableを1つにまとめ、同時に実行するようにする
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-2: 100
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-2: 101
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 完了

 実行結果より、マージ元のFlowableが同時に処理が実行されていることがわかります。さらに、(2)のFlowableが完了した後も(1)のデータを通知していることより、全てのFlowableが完了を通知するまで全てのデータが通知されることがわかります。

経過時間と通知されるデータ
経過時間 0 300 500 600 900 1000 1200 1500
(1)のFlowable   0   1 2   3 4
(2)のFlowable     100     101    
結果のデータ   0 100 1 2 101 3 4

次のページ
複数のFlowable/Observableを結合するオペレータ(2)

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
RxJavaによるリアクティブプログラミング入門連載記事一覧

もっと読む

この記事の著者

須田 智之(スダ トモユキ)

十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9974 2017/02/23 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング