Flowable/Observableを生成するオペレータ
Flowable/Observableを生成するオペレータには前回見たcreateメソッドの他にも、さまざまなオペレータがあります。ここでは前回のサンプルで使ったcreateメソッドを除いた代表的なオペレータとして次のものを見ていきます。
- just
- fromArray/fromIterable
- interval
- timer
この他にもFlowable/Observableを生成するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。
just
主なメソッド
-
just(T item)
-
just(T item1, T item2)
-
just(T item1, T item2, T item3)
と増えていき最大10個の引数を持つメソッドが用意されています。
justメソッドは引数に渡したデータを通知するFlowable/Observableを生成するオペレータです。引数は最大10個まで指定することが可能で、左から順にデータが通知されます。すべてのデータを通知したら完了(onComplete)を通知します。
サンプル
次のサンプルでは、justメソッドの引数に渡した「A」「B」「C」「D」「E」を順に通知するFlowableを生成しています。すべてのデータを通知し終えたら完了(onComplete)を通知します。
public static void main(String[] args) { // 引数のデータを順に通知するFlowableの生成 Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E"); // ① // 購読開始 flowable.subscribe(new DebugSubscriber<>()); }
- justメソッドを使って、引数のデータを通知するFlowableを生成する。
main: A main: B main: C main: D main: E main: 完了
実行結果より、引数のデータを左から順にSubscriberに通知し、すべてのデータを通知後に完了を通知していることが分かります。
fromArray/fromIterable
主なメソッド
-
fromArray(T... items)
-
fromIterable(Iterable<? extends T> source)
fromArrayメソッドは引数に指定した配列の要素を、fromIterableメソッドはListなどのIterableの要素を順にデータとして通知するFlowable/Observableを生成するオペレータです。生成したFlowable/Observableはすべてのデータを通知し終えたら、完了を通知します。また、fromArrayメソッドの引数は配列の要素を直接指定することもできます。
サンプル
次のサンプルでは、配列の要素を順に通知するFlowableを生成しています。今回は配列のインスタンスをつくらず要素を直接引数に指定しています。配列のすべての要素をデータとして通知し終えたら完了(onComplete)を通知します。
public static void main(String[] args) { // 配列のデータを順に通知するFlowableの生成 Flowable<String> flowable = Flowable.fromArray( "A", "B", "C", "D", "E" ); // ① // 購読開始 flowable.subscribe(new DebugSubscriber<>()); }
- 引数の配列の要素を順に通知するFlowableを生成する。
main: A main: B main: C main: D main: E main: 完了
実行結果より、引数の配列の要素が順に通知し、すべてのデータを通知後に完了を通知していることが分かります。
interval
主なメソッド
-
interval(long time, TimeUnit unit)
-
interval(long time, TimeUnit unit, Scheduler scheduler)
-
interval(long initialDelay, long time, TimeUnit unit)
-
interval(long initialDelay, long time, TimeUnit unit, Scheduler scheduler)
intervalメソッドは指定した通知間隔(インターバル)で0から始まるLong値のデータを通知するFlowable/Observableを生成するオペレータです。通知されるデータは「0」「1」「2」と順に通知されていきます。このintervalメソッドで生成されたFlowable/ObservableはデフォルトでSchedulers.computation()
のScheduler上で実行され、呼び出し元のスレッドとは異なるスレッド上で実行されます。このSchedulerを変更するには引数にSchedulerを受け取るメソッドがあり、それを使うことでデフォルトのSchedulerを変更できます。
また、最初の通知データである「0」を通知するタイミングは、デフォルトでは処理を開始してすぐではなく、指定したインターバルだけ開けて通知します。しかし、引数に最初のデータ通知の待機時間(initialDelay)を指定できるものもあり、そのメソッドを使うことで最初の「0」を通知するタイミングだけ変えることができます。
そして、intervalメソッドで生成したFlowable/Observableは完了をすることがないので、完了を通知するにはtakeメソッドなどで通知するデータの個数を制限するなどしないといけません。
あと、これはJavaで時間を扱う処理をすること全般におけることですが、指定した時間は正確ではなく、CPUの負荷などの影響を受けるため、ある程度の誤差は発生します。
サンプル
次のサンプルでは、intervalメソッドを使って1000ミリ秒ごとに「0」から始まる数値を通知するFlowableを生成しています。また、通知している間隔を見るために、データを受け取った際に実行時刻を出力しています。
public static void main(String[] args) throws Exception { // 「分:秒.ミリ秒」の文字列に変換するFormatter final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("mm:ss.SSS"); // 1000ミリ秒ごとに数値を通知するFlowableの生成 Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS); // ① // 処理を開始する前の時間 System.out.println("開始時間: " + LocalTime.now().format(formatter)); // 購読する flowable.subscribe(data -> { // ② // Thread名の取得 String threadName = Thread.currentThread().getName(); // 現在時刻の「分:秒.ミリ秒」を取得 String time = LocalTime.now().format(formatter); // 出力 System.out.println(threadName + ": " + time + ": data=" + data); }); // しばらく待つ Thread.sleep(5000L); }
- intervalメソッドを使って、1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
- 通知されたデータを受け取った際の時刻を「分:秒.ミリ秒」で出力させる。
開始時間: 55:29.085 RxComputationThreadPool-1: 55:30.105: data=0 RxComputationThreadPool-1: 55:31.105: data=1 RxComputationThreadPool-1: 55:32.106: data=2 RxComputationThreadPool-1: 55:33.105: data=3 RxComputationThreadPool-1: 55:34.105: data=4
実行結果より、ほぼ1秒(1000ミリ秒)単位で「0」「1」「2」と順に数値が通知されていることが分かります。また、デフォルトの設定だと、最初の「0」を通知するまでに1秒(1000ミリ秒)ほど待機しているのも分かります。
timer
-
timer(long time, TimeUnit unit)
-
timer(long time, TimeUnit unit, Scheduler scheduler)
timerメソッドは呼び出されてから指定した時間だけ待機した後、1つのLong値「0」を通知し完了するFlowable/Observableを生成するオペレータです。このtimerメソッドで生成されたFlowable/ObservableはデフォルトでSchedulers.computation()
のScheduler上で実行され、呼び出し元のスレッドとは異なるスレッド上で実行されます。このSchedulerを変更するには引数にSchedulerを受け取るメソッドがあり、それを使うことでデフォルトのSchedulerを変更できます。
また、前述したように指定した時間は正確ではなく、CPUの負荷などの影響を受けるため、ある程度の誤差は発生します。
サンプル
次のサンプルでは、timerメソッドを使って1000ミリ秒後に「0」を通知するFlowableを生成しています。また、通知している待機時間を見るために、データを受け取った際に実行時刻を出力しています。
public static void main(String[] args) throws Exception { // 「分:秒.ミリ秒」の文字列に変換するFormatter final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("mm:ss.SSS"); // 処理を開始する前の時間 System.out.println("開始時間: " + LocalTime.now().format(formatter)); // 1000ミリ秒後に数値「0」を通知するFlowableの生成 Flowable<Long> flowable = Flowable.timer(1000L, TimeUnit.MILLISECONDS); // ① // 購読開始 flowable.subscribe( // 第1引数: データの通知時 data -> { // ② // Thread名の取得 String threadName = Thread.currentThread().getName(); // 現在時刻の「分:秒.ミリ秒」を取得 String time = LocalTime.now().format(formatter); // 出力 System.out.println(threadName + ": " + time + ": data=" + data); }, // 第2引数: エラーの通知時 error -> System.out.println("エラー=" + error), // 第3引数: 完了の通知時 () -> System.out.println("完了")); // ③ // しばらく待つ Thread.sleep(1500L); }
- timerメソッドを使って、1000ミリ秒後に「0」を通知するFlowableを生成。
- 通知されたデータを受け取った際の時刻を「分:秒.ミリ秒」で出力させる。
- 完了の通知を受けとった際に「完了」と出力させる。
開始時間: 55:02.564 RxComputationThreadPool-1: 55:03.684: data=0 完了
実行結果より、ほぼ1000ミリ秒後に「0」を通知していることが分かります。また、データを通知した後に完了も通知していることが分かります。