SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

RxJavaによるリアクティブプログラミング入門

RxJava(2.x)の最初に知っておいてもらいたいオペレータ

RxJavaによるリアクティブプログラミング入門(5)

  • X ポスト
  • このエントリーをはてなブックマークに追加

Flowable/Observableを生成するオペレータ

 Flowable/Observableを生成するオペレータには前回見たcreateメソッドの他にも、さまざまなオペレータがあります。ここでは前回のサンプルで使ったcreateメソッドを除いた代表的なオペレータとして次のものを見ていきます。

  • just
  • fromArray/fromIterable
  • interval
  • timer

 この他にもFlowable/Observableを生成するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

just

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • just(T item)
  • just(T item1, T item2)
  • just(T item1, T item2, T item3)

と増えていき最大10個の引数を持つメソッドが用意されています。

 justメソッドは引数に渡したデータを通知するFlowable/Observableを生成するオペレータです。引数は最大10個まで指定することが可能で、左から順にデータが通知されます。すべてのデータを通知したら完了(onComplete)を通知します。

サンプル

 次のサンプルでは、justメソッドの引数に渡した「A」「B」「C」「D」「E」を順に通知するFlowableを生成しています。すべてのデータを通知し終えたら完了(onComplete)を通知します。

justメソッドのサンプル
public static void main(String[] args) {
  
  // 引数のデータを順に通知するFlowableの生成
  Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E");  // ①
  
  // 購読開始
  flowable.subscribe(new DebugSubscriber<>());
}
  1. justメソッドを使って、引数のデータを通知するFlowableを生成する。
実行結果
main: A
main: B
main: C
main: D
main: E
main: 完了

 実行結果より、引数のデータを左から順にSubscriberに通知し、すべてのデータを通知後に完了を通知していることが分かります。

fromArray/fromIterable

マーブルダイアグラム(fromArray)
マーブルダイアグラム(fromArray)
主なメソッド
  • fromArray(T... items)
  • fromIterable(Iterable<? extends T> source)

 fromArrayメソッドは引数に指定した配列の要素を、fromIterableメソッドはListなどのIterableの要素を順にデータとして通知するFlowable/Observableを生成するオペレータです。生成したFlowable/Observableはすべてのデータを通知し終えたら、完了を通知します。また、fromArrayメソッドの引数は配列の要素を直接指定することもできます。

サンプル

 次のサンプルでは、配列の要素を順に通知するFlowableを生成しています。今回は配列のインスタンスをつくらず要素を直接引数に指定しています。配列のすべての要素をデータとして通知し終えたら完了(onComplete)を通知します。

fromArray(items)のサンプル
public static void main(String[] args) {
  
  // 配列のデータを順に通知するFlowableの生成
  Flowable<String> flowable = Flowable.fromArray(
      "A", "B", "C", "D", "E" );  // ①
  
  // 購読開始
  flowable.subscribe(new DebugSubscriber<>());
}
  1. 引数の配列の要素を順に通知するFlowableを生成する。
実行結果
main: A
main: B
main: C
main: D
main: E
main: 完了

 実行結果より、引数の配列の要素が順に通知し、すべてのデータを通知後に完了を通知していることが分かります。

interval

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • interval(long time, TimeUnit unit)
  • interval(long time, TimeUnit unit, Scheduler scheduler)
  • interval(long initialDelay, long time, TimeUnit unit)
  • interval(long initialDelay, long time, TimeUnit unit, Scheduler scheduler)

 intervalメソッドは指定した通知間隔(インターバル)で0から始まるLong値のデータを通知するFlowable/Observableを生成するオペレータです。通知されるデータは「0」「1」「2」と順に通知されていきます。このintervalメソッドで生成されたFlowable/ObservableはデフォルトでSchedulers.computation()のScheduler上で実行され、呼び出し元のスレッドとは異なるスレッド上で実行されます。このSchedulerを変更するには引数にSchedulerを受け取るメソッドがあり、それを使うことでデフォルトのSchedulerを変更できます。

 また、最初の通知データである「0」を通知するタイミングは、デフォルトでは処理を開始してすぐではなく、指定したインターバルだけ開けて通知します。しかし、引数に最初のデータ通知の待機時間(initialDelay)を指定できるものもあり、そのメソッドを使うことで最初の「0」を通知するタイミングだけ変えることができます。

 そして、intervalメソッドで生成したFlowable/Observableは完了をすることがないので、完了を通知するにはtakeメソッドなどで通知するデータの個数を制限するなどしないといけません。

 あと、これはJavaで時間を扱う処理をすること全般におけることですが、指定した時間は正確ではなく、CPUの負荷などの影響を受けるため、ある程度の誤差は発生します。

サンプル

 次のサンプルでは、intervalメソッドを使って1000ミリ秒ごとに「0」から始まる数値を通知するFlowableを生成しています。また、通知している間隔を見るために、データを受け取った際に実行時刻を出力しています。

interval(time, unit)のサンプル
  public static void main(String[] args) throws Exception {
    // 「分:秒.ミリ秒」の文字列に変換するFormatter
    final DateTimeFormatter formatter =
        DateTimeFormatter.ofPattern("mm:ss.SSS");
    
    // 1000ミリ秒ごとに数値を通知するFlowableの生成
    Flowable<Long> flowable = 
        Flowable.interval(1000L, TimeUnit.MILLISECONDS);  // ①
    
    // 処理を開始する前の時間
    System.out.println("開始時間: " + LocalTime.now().format(formatter));
    
    // 購読する
    flowable.subscribe(data -> {  // ②
      // Thread名の取得
      String threadName = Thread.currentThread().getName();
      // 現在時刻の「分:秒.ミリ秒」を取得
      String time = LocalTime.now().format(formatter);
      // 出力
      System.out.println(threadName + ": " + time + ": data=" + data);
    });
    
    // しばらく待つ
    Thread.sleep(5000L);
  }
  1. intervalメソッドを使って、1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. 通知されたデータを受け取った際の時刻を「分:秒.ミリ秒」で出力させる。
実行結果
開始時間: 55:29.085
RxComputationThreadPool-1: 55:30.105: data=0
RxComputationThreadPool-1: 55:31.105: data=1
RxComputationThreadPool-1: 55:32.106: data=2
RxComputationThreadPool-1: 55:33.105: data=3
RxComputationThreadPool-1: 55:34.105: data=4

 実行結果より、ほぼ1秒(1000ミリ秒)単位で「0」「1」「2」と順に数値が通知されていることが分かります。また、デフォルトの設定だと、最初の「0」を通知するまでに1秒(1000ミリ秒)ほど待機しているのも分かります。

timer

マーブルダイアグラム
マーブルダイアグラム
  • timer(long time, TimeUnit unit)
  • timer(long time, TimeUnit unit, Scheduler scheduler)

 timerメソッドは呼び出されてから指定した時間だけ待機した後、1つのLong値「0」を通知し完了するFlowable/Observableを生成するオペレータです。このtimerメソッドで生成されたFlowable/ObservableはデフォルトでSchedulers.computation()のScheduler上で実行され、呼び出し元のスレッドとは異なるスレッド上で実行されます。このSchedulerを変更するには引数にSchedulerを受け取るメソッドがあり、それを使うことでデフォルトのSchedulerを変更できます。

 また、前述したように指定した時間は正確ではなく、CPUの負荷などの影響を受けるため、ある程度の誤差は発生します。

サンプル

 次のサンプルでは、timerメソッドを使って1000ミリ秒後に「0」を通知するFlowableを生成しています。また、通知している待機時間を見るために、データを受け取った際に実行時刻を出力しています。

timer(time, unit)のサンプル
public static void main(String[] args) throws Exception {
  // 「分:秒.ミリ秒」の文字列に変換するFormatter
  final DateTimeFormatter formatter =
      DateTimeFormatter.ofPattern("mm:ss.SSS");
  
  // 処理を開始する前の時間
  System.out.println("開始時間: " + LocalTime.now().format(formatter));
  
  // 1000ミリ秒後に数値「0」を通知するFlowableの生成
  Flowable<Long> flowable = 
      Flowable.timer(1000L, TimeUnit.MILLISECONDS);  // ①
  
  // 購読開始
  flowable.subscribe(
      // 第1引数: データの通知時
      data -> {  // ②
        // Thread名の取得
        String threadName = Thread.currentThread().getName();
        // 現在時刻の「分:秒.ミリ秒」を取得
        String time = LocalTime.now().format(formatter);
        // 出力
        System.out.println(threadName + ": " + time + ": data=" + data);
      },
      // 第2引数: エラーの通知時
      error -> System.out.println("エラー=" + error),
      // 第3引数: 完了の通知時
      () -> System.out.println("完了"));  // ③
  
  // しばらく待つ
  Thread.sleep(1500L);
}
  1. timerメソッドを使って、1000ミリ秒後に「0」を通知するFlowableを生成。
  2. 通知されたデータを受け取った際の時刻を「分:秒.ミリ秒」で出力させる。
  3. 完了の通知を受けとった際に「完了」と出力させる。
実行結果
開始時間: 55:02.564
RxComputationThreadPool-1: 55:03.684: data=0
完了

 実行結果より、ほぼ1000ミリ秒後に「0」を通知していることが分かります。また、データを通知した後に完了も通知していることが分かります。

次のページ
通知するデータを制限するオペレータ

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
RxJavaによるリアクティブプログラミング入門連載記事一覧

もっと読む

この記事の著者

須田 智之(スダ トモユキ)

十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9879 2017/01/24 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング