Shoeisha Technology Media

CodeZine(コードジン)

記事種別から探す

RxJava(2.x)の最初に知っておいてもらいたいオペレータ

RxJavaによるリアクティブプログラミング入門(5)

  • LINEで送る
  • このエントリーをはてなブックマークに追加
2017/01/24 14:00

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回はRxJava 2.xを使って簡単なサンプルを実装しました。今回はFlowableやObservableの生成、通知するデータの選別や変換などを行い、新たなFlowableやObservableなどを生成するオペレータについて見ていきます。今回はRxJavaを始めるにあたって、まず知っておいてもらいたいオペレータについてRxJava 2.xで用意されている代表的なものを見ていきます。

目次

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

RxJavaのオペレータとは

 RxJavaのオペレータとは、前述したように新たにFlowableやObservableを生成したり、生成したFlowableやObservableが通知するデータの選別や変換などを行ったデータを通知する新たなFlowableやObservableなどを生成したりするメソッドのことです。オペレータの結果として生成されるものは、基本的にはFlowableから呼ばれればFlowableで、Observableから呼ばれればObservableを生成します。例外として、最終的に通知するデータが1件以内の場合、戻り値が今回はまだ紹介しないSingleやMaybeやCompletableになることもありますが、基本的な考え方は同じです。この戻り値がFlowable/Observableを返す性質からメソッドをつなげていくことで、データがいくつかのオペレータの処理を通過し消費者に対して使いやすくなったデータを通知する最終的なFlowable/Observableを生成できます。

 また、実装でもオペレータをつなげていくメソッドチェインで、実装を簡潔にすることも可能です。例えば、次のサンプルはjustメソッドを使って引数のデータを通知するFlowableを生成し、filterメソッドを使って、そのFlowableが通知するデータを偶数のみにし、mapメソッドを使って通知するデータを10倍にしているFlowableをメソッドチェインを使って実装しています。

メソッドチェインの例
public static void main(String[] args) {
  Flowable<Integer> result = Flowable
      // 引数のデータを通知するFlowableを生成
      .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      // 偶数のデータのみを通知する
      .filter(data -> data % 2 == 0)
      // 通知するデータを10倍にする
      .map(data -> data * 10);
  
  // 購読する
  result.subscribe(data -> System.out.println(data));
}

 これを実行すると次の結果が出力されます。

実行結果
20
40
60
80
100

 これをメソッドチェインを使わずに実装すると次のようになります。

public static void main(String[] args) {
  // 引数のデータを通知するFlowableを生成
  Flowable<Integer> flowable1 = Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  // 偶数のデータのみを通知する
  Flowable<Integer> flowable2 = flowable1.filter(data -> data % 2 == 0);
  // 通知するデータを10倍にする
  Flowable<Integer> result = flowable2.map(data -> data * 10);
  
  // 購読する
  result.subscribe(data -> System.out.println(data));
}

 このようにメソッドチェインを使わないと、途中で次のメソッドを呼ぶためだけの不要なインスタンスを定義する必要が出てきてしまいます。しかし、メソッドチェインを使うとこのような不要なインスタンスを生成する必要がなくなり、変数名を間違えてしまったりするようなことがなくなります。また、元のデータを最終的にどうしたいのかも、実装の一連の流れから分かりやすくなるメリットもあります。

 また、一見するとデザインパターンの一つであるビルダーパターンのように見えますが、ビルダーパターンと異なり、オペレータを設定した時点でオペレータの処理を適用した新たなFlowableが生成されています。さらにこのことは、オペレータをつなげる順序によってどのようなデータが通知されるのかにも影響し、かつパフォーマンスにも影響します。例えば、10倍した後に偶数かどうかを判定するとすべてのデータが偶数になりますし、仮に10倍せず違う処理をしてデータのフィルターした場合、意図したようにデータのフィルターができたとしても、最終的に通知されない不要なデータに対して処理を行っていることになり、それがコストのかかる処理の場合は無駄にコストをかけていることになります。そのためRxJavaでメソッドチェインを使う際はオペレータの順番について何が効率的なのかを意識しないといけません。

 加えて、RxJavaのオペレータの多くは引数に関数型インターフェースを受け取るようになっています。関数型インターフェースとは簡単に説明すると、実装すべきメソッドを1つだけ持つインターフェースです。この関数型インターフェースを実装するのにJava 8からラムダ式という関数型インターフェースの実装を簡易化した記述ができるようになっています。ここではラムダ式については特に説明しませんが、ラムダ式を使うと簡潔に実装ができ、かつ可読性も上がることが多いです。RxJavaの多くのオペレータは引数に関数型インターフェースを受け取るようになっているので、ラムダ式を使えるようにした方がよいでしょう。また、Java 8が使えない環境の場合、retrolambdaなどのラムダ式を古いJavaのバージョンでも使えるようにするライブラリもあるので、可能ならばラムダ式を使って実装したほうが良いでしょう。

 では、今回のサンプルが何を行っているのかについて細かく見ていきましょう。まずjustメソッドを使って引数のデータである次のデータを通知するFlowableを生成しています。

通知するデータ
1
2
3
4
5
6
7
8
9
10

 このFlowableに対しfilterメソッドを使って、偶数のデータのみを通知するようにしています。filterメソッドの引数にはRxJavaのPredicateの関数型インターフェースを受け取るようになっており、どのデータを通知するのかを判別しています。また、このPredicateはJava 8のPredicateではないので、パッケージがio.reactivex.functionsになっているのと、Exceptionをthrowできるようになっていることに注意してください。

filterメソッド
filter(Predicate<? super T> predicate)
io.reactivex.functions.Predicateの定義
public interface Predicate<T> {
  boolean test(T t) throws Exception;
}

 このfilterメソッドのPredicateはどのデータを通知するのかを判定する関数型インターフェースで、実装すべきメソッドとしてtestメソッドを持っています。このtestメソッドは引数に通知するデータを受け取り、そのデータを通知するかどうかを判定し、通知するならtrueを、通知しないならfalseを返すように定義します。今回のサンプルのfilterメソッドでは偶数なら通知するように実装しています。

偶数のみを通知するようにする実装
.filter(data -> data % 2 == 0)

 このサンプルはラムダ式を使って実装していますが、これをラムダ式を使わずに実装すると次のようになります。

ラムダ式を使わない場合
.filter(new Predicate<Integer>() {
  
  @Override
  public boolean test(Integer data) throws Exception {
    return data % 2 == 0;
  }
})

 つまり、testメソッドで通知データであるdataを受け取り、それが偶数ならtrueを返すようにしています。そうすることで、ここでは結果がtrueになるデータのみが通知されるようになり、このfilterメソッドから生成されたFlowableは次のデータを通知します。

通知するデータ
2
4
6
8
10

 そして、このFlowableに対しmapメソッドを使って、通知するデータを10倍するようにしています。mapメソッドの引数にはRxJavaのFunctionの関数型インターフェースを受け取るようになっており、受け取ったデータをどのように変換するのかを定義しています。また、このFunctionはJava 8のFunctionではないので、パッケージがio.reactivex.functionsになっているのと、Exceptionをthrowできるようになっていることに注意してください。

mapメソッド
map(Function<? super T, ? extends R> mapper)
io.reactivex.functions.Functionの定義
public interface Function<T, R> {
  R apply(T t) throws Exception;
}

 このmapメソッドのFunctionは受け取ったデータをどのように変換して通知するのかを定義する関数型インターフェースで、実装すべきメソッドとしてapplyメソッドを持っています。このapplyメソッドは引数に通知するデータを受け取り、そのデータから新たなデータを生成して返します。そしてこの戻り値のデータが結果として通知されるデータとなります。このサンプルのmapメソッドでは受け取ったデータを10倍して返すように実装しています。

10倍したデータを通知するようにする実装
.map(data -> data * 10)

 このサンプルはラムダ式を使って実装していますが、これをラムダ式を使わずに実装すると次のようになります。

ラムダ式を使わない場合
.map(new Function<Integer, Integer>() {
  
  @Override
  public Integer apply(Integer data) throws Exception {
    return data * 10;
  }
})

 つまり、applyメソッドで通知データであるdataを受け取り、それを10倍したデータを返すようにしています。そして、その戻り値が通知されるデータとなります。このmapメソッドから生成されたFlowableは次のデータを通知します。

通知するデータ
20
40
60
80
100

 それでは、簡単にオペレータの実装例を見たところで、RxJavaにはどのようなオペレータが用意されているのか、主なオペレータについて見ていきましょう。今回のサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、今後のサンプルではデバッグ用のSubscriberとして次のものを用意しています。

DebugSubscriber.java
/** サンプル用のSubscriber */
public class DebugSubscriber<T> extends ResourceSubscriber<T> {
  
  private String label;
  
  public DebugSubscriber() {
    super();
  }
  
  public DebugSubscriber(String label) {
    super();
    this.label = label;
  }
  
  @Override
  public void onNext(T data) {
    // onNextメソッドの呼び出し時に出力
    String threadName = Thread.currentThread().getName();
    if (label == null) {
      System.out.println(threadName + ": " + data);
    } else {
      System.out.println(threadName + ": " + label + ": " + data);
    }
  };
  
  @Override
  public void onError(Throwable throwable) {
    // onErrorメソッドの呼び出し時に出力
    String threadName = Thread.currentThread().getName();
    if (label == null) {
      System.out.println(threadName + ": エラー = " + throwable);
    } else {
      System.out.println(threadName + ": " + label + ": エラー = " + throwable);
    }
  }
  
  @Override
  public void onComplete() {
    // onCompleteメソッドの呼び出し時に出力
    String threadName = Thread.currentThread().getName();
    if (label == null) {
      System.out.println(threadName + ": 完了");
    } else {
      System.out.println(threadName + ": " + label + ": 完了");
    }
  }
}

 サンプルでは、Subscriberで特殊なことをしない限りは、このDebugSubscriberを使っています。


  • LINEで送る
  • このエントリーをはてなブックマークに追加

著者プロフィール

  • 須田 智之(スダ トモユキ)

    十数年間おもにSI企業にシステムエンジニアとして携わり、現在はフリーランスに。企業向けのシステム開発のかたわら個人でのモバイルアプリの開発やIT分野の記事も執筆。RxJava 2.0に対応した著書『RxJavaリアクティブプログラミング』が2017年2月16日より発売中。

バックナンバー

連載:RxJavaによるリアクティブプログラミング入門
All contents copyright © 2005-2017 Shoeisha Co., Ltd. All rights reserved. ver.1.5