CodeZine(コードジン)

特集ページ一覧

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2016/11/09 14:00

目次

Flowable(Reactive Streams対応)を使ったサンプル

 それでは、実際にRxJava 2.xを使ったサンプルを作成してみましょう。まずはReactive Streamsに対応しているFlowableで実装した場合を見てみましょう。今回のサンプルもRxJava 1.xのサンプルと同じように「Hello, World!」と「こんにちは、世界!」のデータを通知し、その受け取ったデータを出力して、すべてのデータを通知した後に完了の通知を行い、「完了しました」と出力するようにします。FlowableにはRxJava 1.xのObservableと同様にcreateメソッドがあるので、このcreateメソッドを使って実装を行います。

 また今回のサンプルでは、バックプレッシャーをかけるためにはどのようにデータをリクエストするのかを見るために、データを1件ずつ通知するようにリクエストし、その通知が処理されたら再度1件のデータをリクエストすることを繰り返し行うようにしています。さらに通知するデータ量が少ないこともあり、通知待ちのデータはすべてバッファし通知するまで保持するようにしています。

 今回のサンプルの処理の流れは次のようになります。

サンプルのシーケンス図
サンプルのシーケンス図
  1. SubscriberがFlowableを購読し、Flowableの処理を開始する。
  2. FlowableがSubscriotionを生成する。
  3. Flowableが購読を開始したことをSubscriberに通知(onSubscribe)し、その際に生成したSubscriptionを渡す。
  4. Subscriberが受け取ったSubscriptionのrequestメソッドを使って1件だけデータを通知するようにリクエストする。
  5. Flowableが文字列"Hello, World!"を通知する。
  6. Subscriberがデータを受け取り、"Hello, World!"と出力する。
  7. SubscriberがSubscriptionのrequestメソッドを使って1件だけデータを通知するようにリクエストする。
  8. Flowableが文字列"こんにちは、世界!"を通知する。
  9. Subscriberがデータを受け取り、"こんにちは、世界!"と出力する。
  10. すべてのデータを通知した後にFlowableが完了(onComplete)したことを通知する。
  11. 完了の通知を受け取ったSubscriberが"完了しました"と出力する。

 また、このサンプルではデータを受け取るSubscriberの処理をFlowableの処理とは異なるスレッド上で行うようにしており、どのスレッドで処理を行っているのかを確認するため、実行しているスレッド名も出力しています。

Flowableで実装した場合のサンプル
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するFlowableの生成
  Flowable<String> flowable =
      Flowable.create(new FlowableOnSubscribe<String>() {
        
        @Override
        public void subscribe(FlowableEmitter<String> emitter)
            throws Exception {
          
          String[] datas = { "Hello, World!", "こんにちは、世界!" };
          
          for (String data : datas) {
            // 購読解除されている場合は処理をやめる
            if (emitter.isCancelled()) {
              return;
            }
            
            // データを通知する
            emitter.onNext(data);
          }
          
          // 完了したことを通知する
          emitter.onComplete();
        }
      }, BackpressureStrategy.BUFFER);  // 超過したデータはバッファする
  
  flowable
      // Subscriberの処理を別スレッドで行うようにする
      .observeOn(Schedulers.computation())
      // 購読する
      .subscribe(new Subscriber<String>() {
      
        /** データ数のリクエストおよび購読の解除を行うオブジェクト */
        private Subscription subscription;
        
        // 購読が開始された際の処理
        @Override
        public void onSubscribe(Subscription subscription) {
          // SubscriptionをSubscriber内で保持する
          this.subscription = subscription;
          // 受け取るデータ数をリクエストする
          this.subscription.request(1L);
        }
        
        // データを受け取った際の処理
        @Override
        public void onNext(String item) {
          // 実行しているスレッド名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + item);
          
          // 次に受け取るデータ数をリクエストする
          this.subscription.request(1L);
        }
        
        // 完了を通知された際の処理
        @Override
        public void onComplete() {
          // 実行しているスレッド名の取得
          String threadName = Thread.currentThread().getName();
          System.out.println(threadName + ": 完了しました");
        }
        
        // エラーを通知された際の処理
        @Override
        public void onError(Throwable error) {
          error.printStackTrace();
        }
      });
    
  // しばらく待つ
  Thread.sleep(500L);
}
実行結果
RxComputationThreadPool-1: Hello, World!
RxComputationThreadPool-1: こんにちは、世界!
RxComputationThreadPool-1: 完了しました

 それでは、サンプルでは何をやっているのか見てみましょう。まずはデータを通知するFlowableについて見ていきます。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門

もっと読む

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

あなたにオススメ

All contents copyright © 2005-2021 Shoeisha Co., Ltd. All rights reserved. ver.1.5