SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

Java 9で「変わること」と、Javaのこれまで

リアクティブストリームとCompletableFutureの改善

Java 9で「変わること」と、Javaのこれまで 第5回

  • X ポスト
  • このエントリーをはてなブックマークに追加

 前々回と前回でJava 9での大きな変更であるモジュール機能と新たなツールであるJShellの紹介をしました。今回からはJDKのコアライブラリに関する変更点を見ていきます。まず、ライブラリ面で最も大きな違いは、リアクティブストリームと呼ばれる並行処理に関する機能が新しく追加されていることです。リアクティブストリームといえば、RxJavaが有名ですが、今回追加された機能はRxJavaと比較して非常にシンプルなものです。しかし、十分便利な機能のなのでぜひ、利用してみることをおすすめします。

  • X ポスト
  • このエントリーをはてなブックマークに追加

リアクティブストリームとは

 リアクティブストリームについては、初回の「Java 9のリリースとこれまでのトレンドを振り返る」でも簡単に紹介しました。並列処理を効率的にするための方法のひとつで、パブリッシュ・サブスクライブ・モデル(以下、PubSubモデル)が使用されています。リアクティブストリームという言葉になじみがない方も、PubSubモデルといえばイメージがつく方もいるのではないでしょうか。

パブリッシュ・サブスクライブ・モデル

 このPubSubモデルの概念を示したものが図1です。PubSubモデルは、その名の通り、データを提供するオブジェクト(パブリッシュ)とデータを処理するオブジェクト(サブスクライブ)に分けて管理します。こうすることでデータの提供と処理を疎結合にすることができます。また多くの場合、データを提供する側と、処理をする側を多対多の関係にすることができるので、多様に発生するデータとその処理を行うためのコードを記述しやすくなります。

図1 パブリッシュ・サブスクライブ・モデルの概念図
図1 パブリッシュ・サブスクライブ・モデルの概念図

 似たような概念には、メッセージキューや、イベントとイベントリスナがあり、これらとの違いについて疑問に思う方もいるかと思います。

 PubSubモデルがメッセージキューと大きく異なる点として、複数の処理で同じデータを利用できることが挙げられます。メッセージキューはデータをすぐに処理せず、データと処理を分離するためにキューにためます。それを別のスレッドやプロセスに移して処理を行います。処理を並列化しやすく、同じ処理をできるだけ高速に行いたい場合に有用です。

 PubSubモデルは、複数の処理で同じデータを扱うことができるため、同じデータに対して独立した異なる処理をしたい場合に有効です。処理ごとにメッセージキューがあるイメージととらえることもできますが、処理を追加する側から見ると1つのキューとして扱うことになります。

 一方、同じデータで異なる処理をしたい場合には、イベントとイベントリスナでも実現が可能です。しかしこちらはイベントが発生したタイミングですぐに処理が行われてしまうため、イベントリスナ側の処理が遅い場合には問題が発生します。

 PubSubモデルはさまざまなところで使われています。例えばIoTで利用されるケースがあるMQTTサーバは、このPubSubモデルをミドルウェアとして提供しているサーバです。さまざまなデバイスが生成するデータを処理しなければならないIoTソリューションではよく使われるモデルです。そのため、PubSubという言葉を知らなくても、実際には使ったことがある方は多いのではないかと思います。

Flowクラスの役割

 リアクティブストリーム機能において、JDK9では新たにjava.util.concurrent.Flowクラスが追加されました。このクラスには以下に示す4つのインターフェースがあり、それらのインターフェースを用いてPubSubモデルを構築します。

  • Flow.Subscriber
  • Flow.Publisher
  • Flow.Processor
  • Flow.Subscription

 これらのインターフェースの関係を示したのが、図2です。

図2 Flowクラスのインターフェースの関係
図2 Flowクラスのインターフェースの関係

 Flow.Publisherは、外部から受け付けたデータをsubscribeメソッドにて登録されたFlow.Subscriberのオブジェクトに提供します。

 一方、Flow.Subscriberは登録が完了するとonSubscribeメソッドにて、Flow.Subscriptionのオブジェクトを取得します。データを受信したい場合にはrequestメソッドにて指定したN個のデータを要求し、Flow.Publisherはデータが要求されるとonNextメソッドを通じてデータをFlow.Subscriberに提供します。

 また、Flow.Publisherのcloseメソッドでデータ受信の完了を指示すると、Flow.SubscriberにはonCompleteメソッドが実行され、処理が終了するタイミングでSubscriber側でも処理をすることができます。

 Flow.ProcessorインターフェースはFlow.PublisherとFlow.Subscriberの双方のインターフェースを持つインターフェースです。例えば、あるSubscriberをPublisherとしても機能させ、別のSubscriberにデータを提供したい場合などに利用します。

 続いて、実際のコードを見ながら動作を確認します。リスト1は、Flow.Subscriberインターフェースを実装したサンプルコードです。

リスト1 Flow.Subscriberインターフェースの実装例(src/main/com/coltware/rx/flow/SampleSubscriber.javaの抜粋)
public class SampleSubscriber<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(Thread.currentThread().getName() + ":Subscriber:onSubscribe(" + subscription + ")");
        this.subscription = subscription;
        //  (1) 購読処理が完了したら、最初のデータを1つ要求する
        subscription.request(1);
    }
    @Override
    //  (2) データを受信する
    public void onNext(T item) {
        System.out.println(Thread.currentThread().getName() + ":Subscriber:onNext( " + item + " )");
        //  (3)次のデータを要求する
        this.subscription.request(1);
    }
    @Override
    // (4) 処理が完了したとき
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + ":Subscriber:onComplete()");
    }
    @Override
    //  (5) エラーが発生したとき
    public void onError(Throwable throwable) {
        System.out.println("Subscriber:onError - " + throwable.getMessage());
    }
}

 Flow.Publisherのsubscribeメソッドが実行されたときにonSubscribeメソッドがコールされるので、その中で(1)の通りrequestメソッドを使ってデータを要求します。実際にデータを受信すると(2)のonNextメソッドがコールされるので、その中でも(3)のように再度1つのデータを要求します。今回のサンプルコードでは1つずつデータを要求していますが、この個数はいくつでも問題なく、実際のデータ数よりも多くても問題はありません。ただし、Publisher側で準備できていない場合はデータを受信できません。

 また、(4)はデータが終了したタイミングでの処理を記述し、(5)ではエラーによって終了した場合の処理を記述します。

 続いて、実装したSampleSubscriberに1から10までの数値データを提供するサンプルコードをリスト2に示します。

リスト2 Flow.Subscriberインターフェースの実装例(src/main/com/coltware/rx/Main.javaの抜粋)
//  (1) Flow.Publisherのインスタンスを作成
final SubmissionPublisher publisher = new SubmissionPublisher();
//  (2) Flow.Subscriberのインスタンスを作成
SampleSubscriber sub = new SampleSubscriber();
//  (3) サブスクライブする
publisher.subscribe(sub);

//  (4) タイマー処理を使って、0-10までの数値データを0.5秒ごとに作成する
final Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
private int counter = 0;
@Override
public void run() {
    if(counter < 10) {
        //  (5) 数値データを(1)で作成したオブジェクトに送信する
        System.out.println(Thread.currentThread().getName() + ":submit(" + counter + ")");
            publisher.submit(counter);
        }
        else{
            //  (6) 処理を終了する
            publisher.close();
            //  (7) エラーで終了させたい場合
            // publisher.closeExceptionally(new RuntimeException("unknown error"));
            timer.cancel();
        }
        counter++;
    }
}, 100, 500);

//  (8) SubmissionPublisherのconsumeメソッドでFutureを取得する
CompletableFuture<Integer> f = publisher.consume(s -> {});
try {
    f.get(); // (9) 終了まで待つ
}
catch(Exception ex){
}

 Flow.Publisherインターフェースを自分で実装するとなると、並列処理等を考慮する必要があり非常に面倒です。

 しかし、Java 9ではすでにSubmissionPublisherというクラスが用意されているので、(1)の通りこのクラスを使うと便利です。

 続いて(2)では、先ほど定義したSampleSubscriberクラスのインスタンスを作成し、そのインスタンスを(3)のようにsubscribeメソッドでPublisher側に登録します。

 また、このサンプルコードでは(4)のようにデータ生成を行う処理でTimerを使って生成しています。実際には他のスレッドなどでデータが作成される場合が多く、そのケースに近づけるためです。

 Timer処理の中ではカウンターがあり、(5)のように0~10までの数値データを生成し、10より大きくなると(6)の通り処理を終了させます。例えば、処理中に何らかのエラーが発生し、処理を終了させる必要がある場合には、(7)のようにcloseExceptionallyメソッドを使ってエラーをSubscriber側に通知します。

 (8)(9)はメインスレッドが終了しないように、それらの処理の完了を待つためのコードです。

 これらの処理を実行した結果が、リスト3です。

リスト3 サンプルコードの実行結果
ForkJoinPool.commonPool-worker-9:Subscriber:onSubscribe(java.util.concurrent.SubmissionPublisher$BufferedSubscription@152f9150)
Timer-0:submit(0)
ForkJoinPool.commonPool-worker-9:Subscriber:onNext( 0 )
Timer-0:submit(1)
ForkJoinPool.commonPool-worker-2:Subscriber:onNext( 1 )
Timer-0:submit(2)
ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 2 )
Timer-0:submit(3)
ForkJoinPool.commonPool-worker-9:Subscriber:onNext( 3 )
Timer-0:submit(4)
ForkJoinPool.commonPool-worker-9:Subscriber:onNext( 4 )
Timer-0:submit(5)
ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 5 )
Timer-0:submit(6)
ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 6 )
Timer-0:submit(7)
ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 7 )
Timer-0:submit(8)
ForkJoinPool.commonPool-worker-11:Subscriber:onNext( 8 )
Timer-0:submit(9)
ForkJoinPool.commonPool-worker-2:Subscriber:onNext( 9 )
ForkJoinPool.commonPool-worker-2:Subscriber:onComplete()

 この結果を見るとわかるように、データを生成しているTimer側の処理とSubscriber側の処理は異なるスレッド上で動いています。

 また、実際の処理でも異なるスレッドが処理をしています。このように、スレッド間のデータ同期等を考慮しなくても、簡単に並列処理が記述できます。

Flow.Processorの実装

 続いて、Flow.Processorインターフェースの実装例を紹介します。Processorは、Publisherでもあり、ストリームとしてのフィルタのような中間操作を行うことが可能です。Processorを組み合わせることで処理のパイプラインが作りやすくなります。実際に定義する場合もリスト4のように簡単に定義することができます。このコードでは、受信した数値が偶数の場合にデータを加工して次のSubscriberにデータを渡すProcessorを実装しています。

リスト4 Flow.Processorの実装例(src/main/com/coltware/rx/flow/SampleProcessor.javaの抜粋)
// (1) Flow.SubscriberとFlow.Publisherのインターフェースを持つクラスを実装
public class SampleProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }
    @Override
    public void onNext(Integer item) {
        if(item % 2 == 0){
            //  (2) データをSubscriberに渡す
            submit("item " + item);
        }
        this.subscription.request(1);
    }
    @Override
    public void onComplete() {
        (3) 終了処理
        this.close();
    }
    @Override
    public void onError(Throwable throwable) {
        (4) エラー時の終了処理
        this.closeExceptionally(throwable);
    }
}

 (1)では、Publisherインターフェースを持つSubmissionPublisherを継承してクラスを作り、そこにSubscriberでのメソッドを実装しています。

 実際にデータのフィルタ処理と加工処理を行っているのが(2)の部分です。後は、Subscriberとして受信した終了情報を(3)(4)のようにPublisherとして実行します。

会員登録無料すると、続きをお読みいただけます

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

次のページ
拡張されたCompletableFuture

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
Java 9で「変わること」と、Javaのこれまで連載記事一覧

もっと読む

この記事の著者

WINGSプロジェクト 小林 昌弘(コバヤシ マサヒロ)

WINGSプロジェクトについて>有限会社 WINGSプロジェクトが運営する、テクニカル執筆コミュニティ(代表 山田祥寛...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

山田 祥寛(ヤマダ ヨシヒロ)

静岡県榛原町生まれ。一橋大学経済学部卒業後、NECにてシステム企画業務に携わるが、2003年4月に念願かなってフリーライターに転身。Microsoft MVP for Visual Studio and Development Technologies。執筆コミュニティ「WINGSプロジェクト」代表。主な著書に「独習シリーズ(Java・C#・Python・PHP・Ruby・JSP&サーブレットなど)」「速習シリーズ(ASP.NET Core・Vue.js・React・TypeScript・ECMAScript、Laravelなど)」「改訂3版JavaScript本格入門」「これからはじめるReact実践入門」「はじめてのAndroidアプリ開発 Kotlin編 」他、著書多数

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/10668 2018/03/01 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング