RxJavaリアクティブプログラミング (CodeZine BOOKS)を読んでいる。
RxJava 2が対象になっており、コードのほとんどでFlowableが使われている。Reactive StreamsやBackpressureに関する説明も豊富で、これまでAndroidなどクライアントサイドでRxJavaを使っていた人が読む本というより、サーバーサイドで使いたいって人の方が合っている気がする。
また、Flowableは同じくReactive Streams準拠でSpring Framework 5に同梱されるReactorのFlux / Monoとほとんど同じなので、Reactorを学ぶという意味でも十分使える。
RxJava2のメイン開発者であるDavid KarnokはReactorにもcontributeしているし、Reactorのメイン開発者であるStephane Maldiniと頻繁に情報交換しているようなので、APIの使い方や考え方はとても似ている。
(同じReactive Streams準拠でもAkka StreamsはScalaなので読めない...)
実際に「Chapter04 FlowableとObservableのオペレータ」のコードのReactor版を書いてみた。
https://github.com/making/reactor-reactive-programming
一部、Reactorが対応していないメソッドはあるが、サンプルコードのほとんどがそのままReactorにポートできた。
この本をReactorで写経すればFluxのちょうど良い練習になる。
写経して感じたRxJava2に対するReactorのメリットは以下3点
ReactorはJava8ベースなのでjava.util.function.*やjava.time.Durationが使える。
RxJava2はAndroidユーザーもターゲットとなっており、Java 6に対応しているが、ReactorはJava 8必須となっている。
そのため、Reactorではjava.util.function.Functionやjava.util.function.Consumer、java.util.function.Supplierなど、Java標準の関数クラスをそのまま使える。RxJava2ではio.reactivex.functions.Functionやio.reactivex.functions.Consumerが用意されている。
また、RxJava2では
Flowable<Long> flowable = Flowable.interval(100, TimeUnit.MILLISECONDS);
と書くところをRactorでは
Flux<Long> flux = Flux.interval(Duration.ofMillis(100));
と書ける。ほんのちょっとの違いだけれど、間隔を指定することが多いので、Durationを使えるのは嬉しい。
書籍では扱われていないが、Reactorではjava.util.concurrent.CompletionStage / CompletableFutureからMonoへの変換やjava.util.stream.StreamからFluxへの変換もサポートされている。Java 8ベースというのは大きい。
一方、java.util.functionを使うことにはデメリットがある。ラムダ内で例外ハンドリングしないといけない点である。
RxJava2の関数クラスにはthrow Exceptionが付いているため、そのままスローしてRxJava側でハンドリングすれば良い。
Reactorの場合、ラムダ内のチェック例外はcatchして処理するかre-throwする必要がある。
ReactorにはTupleがあって、zipメソッドなどの返り値に使える。
ReactorにはTupleが用意されている。
RxJava2で
Flowable<Long> flowable1 = Flowable.interval(300, TimeUnit.MILLISECONDS).take(5);
Flowable<Long> flowable2 = Flowable.interval(300, TimeUnit.MILLISECONDS).take(3).map(data -> data + 100);
Flowable<List<Long>> result = Flowable.zip(flowable1, flowable2, (data1, data2) -> Arrays.asList(data1, data2));
というようにzipper関数を渡す必要があるが、Reactorでは
Flux<Long> flux1 = Flux.interval(Duration.ofMillis(300)).take(5);
Flux<Long> flux2 = Flux.interval(Duration.ofMillis(500)).take(3).map(data -> data + 100);
Flux<Tuple2<Long, Long>> result = Flux.zip(flux1, flux2);
というように複数のFluxをzipに渡してそのまま返せる。もちろん通常はその後mapでデータ変換するので、zipper側で変換するか、一旦Tupleで返してmapで変換するかの違いではあるが、個人的にはTupleの方が好き。
Reactive Streamsに完全対応で用途別にFlux / Monoがある
ReactorはReactive Streamsに準拠したクラスが2つあり、0件または1件のデータを扱うためのMonoとn(>=0)件のデータを扱うFluxと役割が明確に分かれている。
RxJava2ではReactiveStreamsに準拠したクラスはFlowableのみであり、1件のデータを扱うSingleと0件または1件のデータを扱うMaybeはReactive Streamsに対応していない。(ちなみにRxJava1由来でn件のデータを扱うがBackpressureに対応していないObservableもある。主にクライアント用途だと思われる。)
例えばRxJava2ではIterableから変換する場合もCallableから変換する場合もFlowableである。
Flowable<String> flowable1 = Flowable.fromIterable(Arrays.asList("A", "B", "C"));
Flowable<Long> flowable2 = Flowable.fromCallable(() -> System.currentTimeMillis());
Callableの返り値は1件にしかなりえないため、ReactorではCallableからFluxを作るメソッドは用意されておらずMono側で提供される。
Flux<String> flux = Flux.fromIterable(Arrays.asList("A", "B", "C"));
Mono<Long> mono = Mono.fromCallable(() -> System.currentTimeMillis());
返り値が一件の場合、ReactorはMonoを返すが、RxJava2はSingleを返す。
// RxJava2
Single<Long> single = Flowable.just("a", "b", "c").count();
// Reactor
Mono<Long> single = Flux.just("a", "b", "c").count();
SingleはReactive Streams準拠ではない、つまりorg.reactivestreams.Publisherを実装していないため、互換性のためにPublisherを引数にしているメソッドにそのまま渡すことができない。
Java8が使える環境でRxJavaからの以降を考える必要がなければReactorの方が使いやすいと思う。また、Springユーザーは必然的にReactorを選択することになる。
とはいえRxJavaリアクティブプログラミングはReactorを勉強するのにも十分役に立つし、日本語で読めるのはとてもありがたい。
Chapter1やChapter3はReactive StreamsやBackpressure、スレッドの切り替えなどが詳しく書かれていてとても良いが、この手の技術を初めて使う人がいきなり読むには難しいかもしれないので、Chapter4のサンプルコードを写経してRxAPIを十分体験してからChapter1に戻った方が良さそう。
Spring 5でReactorに触れることになるであろう開発者にとってありがたい一冊になりそう。
