---
title: 「RxJavaリアクティブプログラミング」はSpring 5 / Reactorを勉強するのにも役に立つ書籍
tags: ["Reactor", "RxJava", "Spring 5", "Java", "Book"]
categories: ["Programming", "Java", "reactor"]
date: 2017-04-15T09:05:43Z
updated: 2017-04-16T05:39:37Z
---
RxJavaリアクティブプログラミング (CodeZine BOOKS)を読んでいる。
RxJava 2が対象になっており、コードのほとんどで[`Flowable`](http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html)が使われている。[Reactive Streams](http://www.reactive-streams.org/)やBackpressureに関する説明も豊富で、これまでAndroidなどクライアントサイドでRxJavaを使っていた人が読む本というより、サーバーサイドで使いたいって人の方が合っている気がする。
また、[`Flowable`](http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html)は同じくReactive Streams準拠でSpring Framework 5に同梱される[Reactor](https://projectreactor.io/)の[`Flux`](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) / [`Mono`](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)とほとんど同じなので、Reactorを学ぶという意味でも十分使える。
RxJava2のメイン開発者である[David Karnok](https://twitter.com/akarnokd)はReactorにもcontributeしているし、Reactorのメイン開発者である[Stephane Maldini](https://twitter.com/smaldini)と頻繁に情報交換しているようなので、APIの使い方や考え方はとても似ている。
(同じReactive Streams準拠でも[Akka Streams](http://doc.akka.io/docs/akka/2.5.0/scala/stream/)はScalaなので読めない...)
実際に「Chapter04 FlowableとObservableのオペレータ」のコードのReactor版を書いてみた。
[https://github.com/making/reactor-reactive-programming](https://github.com/making/reactor-reactive-programming)
一部、Reactorが対応していないメソッドはあるが、サンプルコードのほとんどがそのままReactorにポートできた。
この本をReactorで写経すれば`Flux`のちょうど良い練習になる。
写経して感じたRxJava2に対するReactorのメリットは以下3点
### ReactorはJava8ベースなので`java.util.function.*`や`java.time.Duration`が使える。
RxJava2はAndroidユーザーもターゲットとなっており、Java 6に対応しているが、ReactorはJava 8必須となっている。
そのため、Reactorでは`java.util.function.Function`や`java.util.function.Consumer`、`java.util.function.Supplier`など、Java標準の関数クラスをそのまま使える。RxJava2では[`io.reactivex.functions.Function`](http://reactivex.io/RxJava/javadoc/io/reactivex/functions/Function.html)や[`io.reactivex.functions.Consumer`](http://reactivex.io/RxJava/javadoc/io/reactivex/functions/Consumer.html)が用意されている。
また、RxJava2では
``` java
Flowable flowable = Flowable.interval(100, TimeUnit.MILLISECONDS);
```
と書くところをRactorでは
``` java
Flux flux = Flux.interval(Duration.ofMillis(100));
```
と書ける。ほんのちょっとの違いだけれど、間隔を指定することが多いので、`Duration`を使えるのは嬉しい。
書籍では扱われていないが、Reactorでは`java.util.concurrent.CompletionStage` / `CompletableFuture`から`Mono`への変換や`java.util.stream.Stream`から`Flux`への変換もサポートされている。Java 8ベースというのは大きい。
一方、`java.util.function`を使うことにはデメリットがある。ラムダ内で例外ハンドリングしないといけない点である。
RxJava2の関数クラスには`throw Exception`が付いているため、そのままスローしてRxJava側でハンドリングすれば良い。
Reactorの場合、ラムダ内のチェック例外はcatchして処理するかre-throwする必要がある。
### Reactorには`Tuple`があって、`zip`メソッドなどの返り値に使える。
Reactorには`Tuple`が用意されている。
RxJava2で
``` java
Flowable flowable1 = Flowable.interval(300, TimeUnit.MILLISECONDS).take(5);
Flowable flowable2 = Flowable.interval(300, TimeUnit.MILLISECONDS).take(3).map(data -> data + 100);
Flowable> result = Flowable.zip(flowable1, flowable2, (data1, data2) -> Arrays.asList(data1, data2));
```
というように`zipper`関数を渡す必要があるが、Reactorでは
``` java
Flux flux1 = Flux.interval(Duration.ofMillis(300)).take(5);
Flux flux2 = Flux.interval(Duration.ofMillis(500)).take(3).map(data -> data + 100);
Flux> result = Flux.zip(flux1, flux2);
```
というように複数の`Flux`を`zip`に渡してそのまま返せる。もちろん通常はその後`map`でデータ変換するので、`zipper`側で変換するか、一旦`Tuple`で返して`map`で変換するかの違いではあるが、個人的には`Tuple`の方が好き。
### Reactive Streamsに完全対応で用途別に`Flux` / `Mono`がある
ReactorはReactive Streamsに準拠したクラスが2つあり、0件または1件のデータを扱うための`Mono`とn(>=0)件のデータを扱う`Flux`と役割が明確に分かれている。
RxJava2ではReactiveStreamsに準拠したクラスは`Flowable`のみであり、1件のデータを扱う[`Single`](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html)と0件または1件のデータを扱う[`Maybe`](http://reactivex.io/RxJava/javadoc/io/reactivex/Maybe.html)はReactive Streamsに対応していない。(ちなみにRxJava1由来でn件のデータを扱うがBackpressureに対応していない[`Observable`](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html)もある。主にクライアント用途だと思われる。)
例えばRxJava2では`Iterable`から変換する場合も`Callable`から変換する場合も`Flowable`である。
``` java
Flowable flowable1 = Flowable.fromIterable(Arrays.asList("A", "B", "C"));
Flowable flowable2 = Flowable.fromCallable(() -> System.currentTimeMillis());
```
`Callable`の返り値は1件にしかなりえないため、Reactorでは`Callable`から`Flux`を作るメソッドは用意されておらず`Mono`側で提供される。
``` java
Flux flux = Flux.fromIterable(Arrays.asList("A", "B", "C"));
Mono mono = Mono.fromCallable(() -> System.currentTimeMillis());
```
返り値が一件の場合、Reactorは`Mono`を返すが、RxJava2は`Single`を返す。
``` java
// RxJava2
Single single = Flowable.just("a", "b", "c").count();
// Reactor
Mono single = Flux.just("a", "b", "c").count();
```
`Single`はReactive Streams準拠ではない、つまり[`org.reactivestreams.Publisher`](http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html)を実装していないため、互換性のために`Publisher`を引数にしているメソッドにそのまま渡すことができない。
----
Java8が使える環境でRxJavaからの以降を考える必要がなければReactorの方が使いやすいと思う。また、Springユーザーは必然的にReactorを選択することになる。
とはいえRxJavaリアクティブプログラミングはReactorを勉強するのにも十分役に立つし、日本語で読めるのはとてもありがたい。
Chapter1やChapter3はReactive StreamsやBackpressure、スレッドの切り替えなどが詳しく書かれていてとても良いが、この手の技術を初めて使う人がいきなり読むには難しいかもしれないので、Chapter4のサンプルコードを写経してRxAPIを十分体験してからChapter1に戻った方が良さそう。
Spring 5でReactorに触れることになるであろう開発者にとってありがたい一冊になりそう。