ParallelFlowableの購読(subscribeメソッド)
PrallelFlowableを購読する場合、レールの数だけSubscriberを用意しないといけません。レールの数と異なっていると、java.lang.IllegalArgumentExceptionが各Subscriberにエラーとして通知されます。ParallelFlowableを直接subscribeメソッドを使って購読する場合はparallelメソッドを呼ぶ際に引数のレール数(parallelism
)を指定しておくことが大事になります。そうしないと実行環境によってレール数が変わってしまうので、ある環境では問題なく動くものが別の環境だとエラーを通知するようになり、意図した動きをしなくなる可能性が高くなります。
また、ParallelFlowableのsubscribeメソッドの引数はSubscriberの配列を受け取るようになっており、戻り値は返しません。そして、FlowableのsubscribeメソッドのようにDisposableを返すsubscribeメソッドは用意されていません。
ParallelFlowableのsubscribeメソッド
-
subscribe(Subscriber<? super T>[] subscribers)
PrallelFlowableの完了通知は、元となるFlowableが完了を通知すると、購読している全てのSubscriberに対して行われます。しかし、エラー通知の場合は完了の場合と異なり、どこでエラーが発生したかによってエラーの通知が変わります。
ParallelFlowableのエラー通知のポイント
- 元となるFlowableの処理中にエラーが発生した場合
- ParallelFlowableでの処理中にエラーが発生した場合
まず、元となるFlowableの処理中にエラーが発生した場合、発生したエラーオブジェクトと供に全てのSubscriberに対してエラー通知を行います。
public static void main(String[] args) throws Exception { ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 10) // ParallelFlowableに変換する前に例外を発生させる .doOnNext(data -> { if (data == 3L) { throw new Exception("例外発生"); } }) // ParallelFlowableに変換する .parallel(2) // 非同期で処理を行う .runOn(Schedulers.computation()); // Subscriberの配列の作成 @SuppressWarnings("unchecked") Subscriber<Integer>[] subscribers = new Subscriber[2]; subscribers[0] = new DebugSubscriber<Integer>("No.1"); subscribers[1] = new DebugSubscriber<Integer>("--- No.2"); // 購読する parallelFlowable.subscribe(subscribers); // しばらく待つ Thread.sleep(1000L); }
RxComputationThreadPool-1: No.1: 1 RxComputationThreadPool-2: --- No.2: 2 RxComputationThreadPool-1: No.1: java.lang.Exception: 例外発生 RxComputationThreadPool-2: --- No.2: java.lang.Exception: 例外発生
それに対し、ParallelFlowableに変換した後の処理でエラーが発生した場合は、そのエラーが発生したレールを購読しているSubscriberにのみエラーが通知され、他のSubscriberは正常時と同じようにデータを受け取ります。
public static void main(String[] args) throws Exception { ParallelFlowable<Long> parallelFlowable = Flowable.interval(100L, TimeUnit.MILLISECONDS).take(10) // ParallelFlowableに変換する .parallel(2) // 非同期で処理を行う .runOn(Schedulers.computation()) // 並列モードの後に例外を発生させる .doOnNext(data -> { if (data == 3L) { throw new Exception("例外発生"); } }); // Subscriberの配列の作成 @SuppressWarnings("unchecked") Subscriber<Long>[] subscribers = new Subscriber[2]; subscribers[0] = new DebugSubscriber<Long>("No.1"); subscribers[1] = new DebugSubscriber<Long>("--- No.2"); // 購読する parallelFlowable.subscribe(subscribers); // しばらく待つ Thread.sleep(2000L); }
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-2: --- No.2: java.lang.Exception: 例外発生 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: complete
このように、例外が発生してもエラーの通知を受け取るのはエラーが発生したレールのSubscriberのみで、他のSubscriberはそのままデータを受け取り、処理を続けるようになります。また、エラーの通知後に受け取っているデータが全てのデータを受け取っていないことも注目すべきポイントでしょう。これはエラーが発生したレールにも通知データを振り分けているためで、あまり好ましいデータの振り分けではありません。
しかし、RxJava 2.0.8よりレール上で発生した処理に対するデータ通知の対応としてParallelFailureHandlingという構造体(Enum)が追加され、mapメソッドとfilterメソッド、およびdoOnNextメソッドの引数に指定できるようになりました。このParallelFailureHandlingはエラーが発生した際のふるまいをどうするか指定するための構造体で、指定することでエラー発生後のデータの振り分けをある程度は制御することができるようになります。ParallelFailureHandlingには次の種類があります。
ParallelFailureHandling | 説明 |
---|---|
ERROR | レール上の処理でエラーが発生したら、そのレール上のSubscriberにはエラーとして通知し、それ以降のデータは残りのSubscriberが受け取る。 |
STOP | レール上の処理でエラーが発生したら、そのレール上のSubscriberは完了の通知を受け取り、それ以降のデータは残りのSubscriberが受け取る。 |
SKIP | レール上の処理でエラーが発生したら、そのレール上のSubscriberはそのデータの処理をスキップして、それ以降の処理を行う。 |
RETRY | レール上の処理でエラーが発生したら、そのレール上のSubscriberは再度そのデータを使って処理を行う。 |
例えば、先ほどのエラーを発生させるサンプルのdoOnNextメソッド部分にParallelFailureHandlingを設定し、実行したとします。
…略 .doOnNext(data -> { if (data == 3L) { throw new Exception("例外発生"); } }, ParallelFailureHandling.○○○○); // ○○○○にオプションを指定する …略 }
これを実行するとそれぞれ次の結果を得ることになります。
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-2: --- No.2: java.lang.Exception: 例外発生 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: 9 RxComputationThreadPool-1: No.1: complete
※データ「3」でエラーの通知を受け、他のSubscriberがそれ以降のデータを受け取る
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-2: --- No.2: complete RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: 9 RxComputationThreadPool-1: No.1: complete
※データ「3」で完了の通知を受け、他のSubscriberがそれ以降のデータを受け取る
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-2: --- No.2: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-2: --- No.2: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-2: --- No.2: 9 RxComputationThreadPool-1: No.1: complete RxComputationThreadPool-2: --- No.2: complete
※データ「3」の通知の処理をスキップしている
ParallelFailureHandling.RETRYを指定する場合は、そのエラーが回復可能なエラーでない限り、そのデータの処理を繰り返すことになるため、何度かリトライしたら例外が発生しないようサンプルのコードを変えます。
…略 ParallelFlowable<Long> parallelFlowable = Flowable.rangeLong(0L, 10) // 通知間隔を開けないためにrangeLongに変更 …略 // 並列モードの後に例外を発生させる .doOnNext(new Consumer<Long>() { private int count = 0; @Override public void accept(Long data) throws Exception { if (data == 3L && 3 > count++) { throw new Exception("例外発生"); } } }, ParallelFailureHandling.RETRY); // エラー時の対応 …略
これを実行すると以下のようになり、エラーが発生していたデータ(3)をそのレール上で再度実行し、エラーにならずに処理ができてから後続の処理を行っていることがわかります。
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: complete RxComputationThreadPool-2: --- No.2: 3 RxComputationThreadPool-2: --- No.2: 5 RxComputationThreadPool-2: --- No.2: 7 RxComputationThreadPool-2: --- No.2: 9 RxComputationThreadPool-2: --- No.2: complete
ただし、このリトライは回復可能なエラーの場合は有効ですが、回復しないエラーの場合は同じ処理を延々と繰り返すことになるので注意してください。
購読解除(Subscriberからのcancelメソッド/disposeメソッド)
それでは、次にParallelFlowableの購読解除について見ていきましょう。基本的にはsubscribeメソッドがDisposableを戻り値として返さないことより、現状はParellelFlowableの購読解除がSubscriber外から行われないと想定した設計になっていると考えられます。そうなると、購読を解除するにはSubscriber内でcancelメソッド(およびDisposableSubscriberなどの場合はdisopseメソッド)を呼んで購読解除することになります。そして、購読解除が行われるSubscriberはその購読解除を呼び出したSubscriberのみとなり、他のSubscriberはそのまま購読を続けることになります。
@SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { ParallelFlowable<Long> parallelFlowable = Flowable.interval(100L, TimeUnit.MILLISECONDS).take(10) // ParallelFlowableに変換する .parallel(2) // 非同期で処理を行う .runOn(Schedulers.computation()); Subscriber<?>[] subscribers = { // 通常のデバッグ用Subscriber new DebugSubscriber<Long>("No.1"), // データ「3」でキャンセルするSubscriber new CancelOn3Subscriber("--- No.2") }; // 購読する parallelFlowable.subscribe((Subscriber<Long>[]) subscribers); // しばらく待つ Thread.sleep(2000L); } /** データ「3」を受け取ると購読解除するSubscriber */ private static class CancelOn3Subscriber extends DebugSubscriber<Long> { public CancelOn3Subscriber(String name) { super(name); } @Override public void onNext(Long data) { // データが「3」を受け取った際にキャンセルを行う if (data.equals(3L)) { super.dispose(); System.out.println("購読解除しました。 受け取ったデータ=" + data); return; } super.onNext(data); } }
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 購読解除しました。 受け取ったデータ=3 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: 9 RxComputationThreadPool-1: No.1: complete
実行結果より、購読解除をしたNo.2のSubscriberはそれ以降データを受け取っていませんが、No.1のSubscriberはそれ以降も通知されたデータを受け取り続けていることがわかります。
注意すべき点として、購読解除のふるまいは2.0.7から変更されています。2.0.6までは1つのSubscriberが購読解除をするとParallelFlowable自体が通知を止めてしまい、全てのSubscriberがデータを受け取らないようになっていました。そのため、ParallelFlowableに対して購読解除を行っている場合は、どのバージョンを使っているのかを確認しましょう。RxJavaのバージョンを更新する必要がある場合は、バージョンを上げることでふるまいが変わらないのかの確認が必要になります。また、この並列処理の機能自体はまだベータ版なので、今後も仕様が変わる可能性があることに注意してください。