リアクティブストリームとは
リアクティブストリームについては、初回の「Java 9のリリースとこれまでのトレンドを振り返る」でも簡単に紹介しました。並列処理を効率的にするための方法のひとつで、パブリッシュ・サブスクライブ・モデル(以下、PubSubモデル)が使用されています。リアクティブストリームという言葉になじみがない方も、PubSubモデルといえばイメージがつく方もいるのではないでしょうか。
パブリッシュ・サブスクライブ・モデル
このPubSubモデルの概念を示したものが図1です。PubSubモデルは、その名の通り、データを提供するオブジェクト(パブリッシュ)とデータを処理するオブジェクト(サブスクライブ)に分けて管理します。こうすることでデータの提供と処理を疎結合にすることができます。また多くの場合、データを提供する側と、処理をする側を多対多の関係にすることができるので、多様に発生するデータとその処理を行うためのコードを記述しやすくなります。
似たような概念には、メッセージキューや、イベントとイベントリスナがあり、これらとの違いについて疑問に思う方もいるかと思います。
PubSubモデルがメッセージキューと大きく異なる点として、複数の処理で同じデータを利用できることが挙げられます。メッセージキューはデータをすぐに処理せず、データと処理を分離するためにキューにためます。それを別のスレッドやプロセスに移して処理を行います。処理を並列化しやすく、同じ処理をできるだけ高速に行いたい場合に有用です。
PubSubモデルは、複数の処理で同じデータを扱うことができるため、同じデータに対して独立した異なる処理をしたい場合に有効です。処理ごとにメッセージキューがあるイメージととらえることもできますが、処理を追加する側から見ると1つのキューとして扱うことになります。
一方、同じデータで異なる処理をしたい場合には、イベントとイベントリスナでも実現が可能です。しかしこちらはイベントが発生したタイミングですぐに処理が行われてしまうため、イベントリスナ側の処理が遅い場合には問題が発生します。
PubSubモデルはさまざまなところで使われています。例えばIoTで利用されるケースがあるMQTTサーバは、このPubSubモデルをミドルウェアとして提供しているサーバです。さまざまなデバイスが生成するデータを処理しなければならないIoTソリューションではよく使われるモデルです。そのため、PubSubという言葉を知らなくても、実際には使ったことがある方は多いのではないかと思います。
Flowクラスの役割
リアクティブストリーム機能において、JDK9では新たにjava.util.concurrent.Flowクラスが追加されました。このクラスには以下に示す4つのインターフェースがあり、それらのインターフェースを用いてPubSubモデルを構築します。
- Flow.Subscriber
- Flow.Publisher
- Flow.Processor
- Flow.Subscription
これらのインターフェースの関係を示したのが、図2です。
Flow.Publisherは、外部から受け付けたデータをsubscribeメソッドにて登録されたFlow.Subscriberのオブジェクトに提供します。
一方、Flow.Subscriberは登録が完了するとonSubscribeメソッドにて、Flow.Subscriptionのオブジェクトを取得します。データを受信したい場合にはrequestメソッドにて指定したN個のデータを要求し、Flow.Publisherはデータが要求されるとonNextメソッドを通じてデータをFlow.Subscriberに提供します。
また、Flow.Publisherのcloseメソッドでデータ受信の完了を指示すると、Flow.SubscriberにはonCompleteメソッドが実行され、処理が終了するタイミングでSubscriber側でも処理をすることができます。
Flow.ProcessorインターフェースはFlow.PublisherとFlow.Subscriberの双方のインターフェースを持つインターフェースです。例えば、あるSubscriberをPublisherとしても機能させ、別のSubscriberにデータを提供したい場合などに利用します。
続いて、実際のコードを見ながら動作を確認します。リスト1は、Flow.Subscriberインターフェースを実装したサンプルコードです。
public class SampleSubscriber<T> implements Flow.Subscriber<T> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println(Thread.currentThread().getName() + ":Subscriber:onSubscribe(" + subscription + ")"); this.subscription = subscription; // (1) 購読処理が完了したら、最初のデータを1つ要求する subscription.request(1); } @Override // (2) データを受信する public void onNext(T item) { System.out.println(Thread.currentThread().getName() + ":Subscriber:onNext( " + item + " )"); // (3)次のデータを要求する this.subscription.request(1); } @Override // (4) 処理が完了したとき public void onComplete() { System.out.println(Thread.currentThread().getName() + ":Subscriber:onComplete()"); } @Override // (5) エラーが発生したとき public void onError(Throwable throwable) { System.out.println("Subscriber:onError - " + throwable.getMessage()); } }
Flow.Publisherのsubscribeメソッドが実行されたときにonSubscribeメソッドがコールされるので、その中で(1)の通りrequestメソッドを使ってデータを要求します。実際にデータを受信すると(2)のonNextメソッドがコールされるので、その中でも(3)のように再度1つのデータを要求します。今回のサンプルコードでは1つずつデータを要求していますが、この個数はいくつでも問題なく、実際のデータ数よりも多くても問題はありません。ただし、Publisher側で準備できていない場合はデータを受信できません。
また、(4)はデータが終了したタイミングでの処理を記述し、(5)ではエラーによって終了した場合の処理を記述します。
続いて、実装したSampleSubscriberに1から10までの数値データを提供するサンプルコードをリスト2に示します。
// (1) Flow.Publisherのインスタンスを作成 final SubmissionPublisher publisher = new SubmissionPublisher(); // (2) Flow.Subscriberのインスタンスを作成 SampleSubscriber sub = new SampleSubscriber(); // (3) サブスクライブする publisher.subscribe(sub); // (4) タイマー処理を使って、0-10までの数値データを0.5秒ごとに作成する final Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { private int counter = 0; @Override public void run() { if(counter < 10) { // (5) 数値データを(1)で作成したオブジェクトに送信する System.out.println(Thread.currentThread().getName() + ":submit(" + counter + ")"); publisher.submit(counter); } else{ // (6) 処理を終了する publisher.close(); // (7) エラーで終了させたい場合 // publisher.closeExceptionally(new RuntimeException("unknown error")); timer.cancel(); } counter++; } }, 100, 500); // (8) SubmissionPublisherのconsumeメソッドでFutureを取得する CompletableFuture<Integer> f = publisher.consume(s -> {}); try { f.get(); // (9) 終了まで待つ } catch(Exception ex){ }
Flow.Publisherインターフェースを自分で実装するとなると、並列処理等を考慮する必要があり非常に面倒です。
しかし、Java 9ではすでにSubmissionPublisherというクラスが用意されているので、(1)の通りこのクラスを使うと便利です。
続いて(2)では、先ほど定義したSampleSubscriberクラスのインスタンスを作成し、そのインスタンスを(3)のようにsubscribeメソッドでPublisher側に登録します。
また、このサンプルコードでは(4)のようにデータ生成を行う処理でTimerを使って生成しています。実際には他のスレッドなどでデータが作成される場合が多く、そのケースに近づけるためです。
Timer処理の中ではカウンターがあり、(5)のように0~10までの数値データを生成し、10より大きくなると(6)の通り処理を終了させます。例えば、処理中に何らかのエラーが発生し、処理を終了させる必要がある場合には、(7)のようにcloseExceptionallyメソッドを使ってエラーをSubscriber側に通知します。
(8)(9)はメインスレッドが終了しないように、それらの処理の完了を待つためのコードです。
これらの処理を実行した結果が、リスト3です。
ForkJoinPool.commonPool-worker-9:Subscriber:onSubscribe(java.util.concurrent.SubmissionPublisher$BufferedSubscription@152f9150) Timer-0:submit(0) ForkJoinPool.commonPool-worker-9:Subscriber:onNext( 0 ) Timer-0:submit(1) ForkJoinPool.commonPool-worker-2:Subscriber:onNext( 1 ) Timer-0:submit(2) ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 2 ) Timer-0:submit(3) ForkJoinPool.commonPool-worker-9:Subscriber:onNext( 3 ) Timer-0:submit(4) ForkJoinPool.commonPool-worker-9:Subscriber:onNext( 4 ) Timer-0:submit(5) ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 5 ) Timer-0:submit(6) ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 6 ) Timer-0:submit(7) ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 7 ) Timer-0:submit(8) ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 8 ) Timer-0:submit(9) ForkJoinPool.commonPool-worker-2:Subscriber:onNext( 9 ) ForkJoinPool.commonPool-worker-2:Subscriber:onComplete()
この結果を見るとわかるように、データを生成しているTimer側の処理とSubscriber側の処理は異なるスレッド上で動いています。
また、実際の処理でも異なるスレッドが処理をしています。このように、スレッド間のデータ同期等を考慮しなくても、簡単に並列処理が記述できます。
Flow.Processorの実装
続いて、Flow.Processorインターフェースの実装例を紹介します。Processorは、Publisherでもあり、ストリームとしてのフィルタのような中間操作を行うことが可能です。Processorを組み合わせることで処理のパイプラインが作りやすくなります。実際に定義する場合もリスト4のように簡単に定義することができます。このコードでは、受信した数値が偶数の場合にデータを加工して次のSubscriberにデータを渡すProcessorを実装しています。
// (1) Flow.SubscriberとFlow.Publisherのインターフェースを持つクラスを実装 public class SampleProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(Integer item) { if(item % 2 == 0){ // (2) データをSubscriberに渡す submit("item " + item); } this.subscription.request(1); } @Override public void onComplete() { (3) 終了処理 this.close(); } @Override public void onError(Throwable throwable) { (4) エラー時の終了処理 this.closeExceptionally(throwable); } }
(1)では、Publisherインターフェースを持つSubmissionPublisherを継承してクラスを作り、そこにSubscriberでのメソッドを実装しています。
実際にデータのフィルタ処理と加工処理を行っているのが(2)の部分です。後は、Subscriberとして受信した終了情報を(3)(4)のようにPublisherとして実行します。