IK.AM

@making's tech note


はじめてのSpring WebFlux (その1 - Spring WebFluxを試す)

🗃 {Programming/Java/org/springframework/web/reactive}
🏷 Reactor 🏷 Reactor Netty 🏷 Netty 🏷 Spring 5 🏷 Spring WebFlux 🏷 Java 
🗓 Updated at 2017-06-06T18:11:23Z  🗓 Created at 2017-05-08T18:56:28Z   🌎 English Page

この記事の内容はSpring 5.0.0.RC1時点でのものです。5.0.0.RELEASE時での動作保証はありません。またSNAPSHOTを使っているため、このままでは動作しないこともあります。

目次

はじめに

Spring WebFluxはSpring 5から追加された、ノンブロッキングなランタイム上でリアクティブプログラミングできる新しいWebフレームワークです。

image

上の図が示す通り、これまでのSpring MVCの横に位置するコンポーネントです。 これまでのSpring MVCがサーブレットコンテナ上でServlet APIをベースにしてきたフレームワークであったのに対し、 Spring WebFluxはServlet APIは使用せず、Reactive Streams及びその実装であるReactorをベースとした新しいHTTP API上に実装されています。ランタイムとしてはNetty、Undertow(サーブレットコンテナじゃない方)といったノンブロッキングランタイムが使えます。またServlet 3.1で導入されたNon-Blocking APIを使用したTomcat, Jetty実装も用意されています。ただし、Spring WebFluxからはServlet APIは見えません。

Spring WebFlux上のプログラミングモデルとしては

  • @Controlller
  • Router Functions

の2パターンが用意されています。前者はこれまでSpring MVCで使用してきたアノテーションベースのContoller実装方法そのものです。 つまりランタイムは変わるけどControllerのプログラミング方法は同じです。 後者の方はラムダベースの全く新しいControllerの実装方法になります。Node.jsのExpressみたいなものをイメージしていただければと思います。

本記事では両方の簡単な使い方を紹介します。

ちなみに詳しくはJava Day Tokyo 2017のD1-D5で話すので聞きに来てください。 http://www.oracle.co.jp/events/javaday/2017

プロジェクト作成

まずはプロジェクトを作成します。Spring Boot 2.0からSpring 5に対応しています。

curlコマンドで雛形プロジェクトを作成します。Windowsの場合はBashから実行してください。

curl https://start.spring.io/starter.tgz \
       -d bootVersion=2.0.0.BUILD-SNAPSHOT \
       -d artifactId=hello-webflux \
       -d baseDir=hello-webflux \
       -d dependencies=webflux \
       -d applicationName=HelloWebFluxApplication | tar -xzvf -

できたhello-webfluxプロジェクトをIDEで開いてください。

以下の説明で、Streamと書いている場合は連続するデータのことを指し、Streamと書いている場合はJava8のjava.util.stream.Streamを指します。

@Controllerモデル

まずは@ControllerモデルのHello Worldを試します。

Hello World

src/main/java/com/example/HelloController.javaを作成して下さい。

package com.example;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }
}

Fluxに見慣れない方はいると思いますが、ぱっと見、今まで書いてきたSpring MVCのControllerと全く同じです。

com.example.HelloWebFluxApplicationクラスのmainメソッドを実行するとSpring WebFluxのアプリケーションが立ち上がります。

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::  (v2.0.0.BUILD-SNAPSHOT)

2017-05-09 03:25:20.123  INFO 92125 --- [           main] com.example.HelloWebFluxApplication      : Starting HelloWebFluxApplication on jpxxmakitm1.corp.emc.com with PID 92125 (/private/tmp/hello-webflux/target/classes started by makit in /private/tmp/hello-webflux)
2017-05-09 03:25:20.128  INFO 92125 --- [           main] com.example.HelloWebFluxApplication      : No active profile set, falling back to default profiles: default
2017-05-09 03:25:20.199  INFO 92125 --- [           main] .r.c.ReactiveWebServerApplicationContext : Refreshing org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext@769e7ee8: startup date [Tue May 09 03:25:20 JST 2017]; root of context hierarchy
2017-05-09 03:25:20.877  WARN 92125 --- [           main] o.h.v.m.ParameterMessageInterpolator     : HV000184: ParameterMessageInterpolator has been chosen, EL interpolation will not be supported
2017-05-09 03:25:21.045  WARN 92125 --- [           main] o.h.v.m.ParameterMessageInterpolator     : HV000184: ParameterMessageInterpolator has been chosen, EL interpolation will not be supported
2017-05-09 03:25:21.269  INFO 92125 --- [           main] s.w.r.r.m.a.RequestMappingHandlerMapping : Mapped "{[/],methods=[GET]}" onto reactor.core.publisher.Flux<java.lang.String> com.example.HelloController.hello()
2017-05-09 03:25:21.296  INFO 92125 --- [           main] o.s.w.r.handler.SimpleUrlHandlerMapping  : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler]
2017-05-09 03:25:21.297  INFO 92125 --- [           main] o.s.w.r.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler]
2017-05-09 03:25:21.381  INFO 92125 --- [           main] o.s.w.r.r.m.a.ControllerMethodResolver   : Looking for @ControllerAdvice: org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext@769e7ee8: startup date [Tue May 09 03:25:20 JST 2017]; root of context hierarchy
2017-05-09 03:25:21.947  INFO 92125 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2017-05-09 03:25:22.042  INFO 92125 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2017-05-09 03:25:22.048  INFO 92125 --- [           main] com.example.HelloWebFluxApplication      : Started HelloWebFluxApplication in 3.044 seconds (JVM running for 3.679)

いつものSpring BootならTomcatが立ち上がっていましたが、今回はNettyが起動していることがわかります。

curllocalhost:8080にアクセスしてください。

$ curl -v localhost:8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/plain;charset=UTF-8
< 
HelloWorld

HellWorldが表示されました。

FluxはReactive StreamsのPublisherを実装したN要素のストリームを表現するReactorのクラスです。 デフォルトではtext/plainでレスポンスが帰りましたが、

  • Server-Sent Event
  • JSON Stream

で返すこともできます。

Server-Sent Eventで返す場合はAcceptヘッダにtext/event-streamを指定してください。

$ curl -v localhost:8080 -H 'Accept: text/event-stream'
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream
< 
data:Hello

data:World

JSON Streamで返すときはAcceptヘッダにapplication/stream+jsonを指定しますが、今回のケース(文字列ストリーム)では見た目がtext/plainの時と変わりません。

$ curl -v localhost:8080 -H 'Accept: application/stream+json'
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: application/stream+json
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/stream+json;charset=UTF-8
< 
HelloWorld

無限Stream

次はもう少しStream感のあるレスポンスを返しましょう。

FluxStreamから生成することもできます。

次のstreamメソッドを作成して、無限Streamを作成し、そのうちの10件をFluxに変換して返却してください。

package com.example;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1); // Java8の無限Stream
        return Flux.fromStream(stream.limit(10))
                .map(i -> Collections.singletonMap("value", i));
    }
}

com.example.HelloWebFluxApplicationクラスを再起動してください。

/streamに対する三種類のレスポンスはそれぞれ、

通常のJSON

$ curl -v localhost:8080/stream
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/json;charset=UTF-8
< 
[{"value":0},{"value":1},{"value":2},{"value":3},{"value":4},{"value":5},{"value":6},{"value":7},{"value":8},{"value":9}]

Server-Sent Event

$ curl -v localhost:8080/stream -H 'Accept: text/event-stream'
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream
< 
data:{"value":0}

data:{"value":1}

data:{"value":2}

data:{"value":3}

data:{"value":4}

data:{"value":5}

data:{"value":6}

data:{"value":7}

data:{"value":8}

JSON Stream

$ curl -v localhost:8080/stream -H 'Accept: application/stream+json'
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: application/stream+json
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/stream+json;charset=UTF-8
< 
{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}
{"value":8}
{"value":9}

となります。これで普通のJSON(application/json)とJSON Stream(application/stream+json)の違いがわかったと思います。

実はlimitをつける必要はなく、そのまま無限Streamを返すこともできます。 これは普通のControllerの発想だと永遠にレスポンスが返ってこないことになります。実はapplication/jsonの場合はレスポンスが返りません(Integerがオーバーフローしたら返るかも?)。 なのでまずはあえてlimitをつけました。Server-Sent EventとJSON Streamは無限Streamを返すこともできます。 試してみましょう。

package com.example;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        return Flux.fromStream(stream) // limitを外す
                                .map(i -> Collections.singletonMap("value", i));
    }
}

Server-Sent EventもJSON Streamも次のように返りますが、Ctrl+Cで止めるまですごいスピードでストリームが流れます。

Server-Sent Event

$ curl -v localhost:8080/stream -H 'Accept: text/event-stream'
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream
< 
data:{"value":0}

data:{"value":1}

data:{"value":2}

data:{"value":3}

data:{"value":4}

data:{"value":5}

data:{"value":6}

data:{"value":7}

data:{"value":8}

data:{"value":9}

data:{"value":10}

data:{"value":11}

data:{"value":12}

data:{"value":13}

data:{"value":14}

... 

JSON Stream

$ curl -v localhost:8080/stream -H 'Accept: application/stream+json'
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: application/stream+json
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/stream+json;charset=UTF-8
< 
{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}
{"value":8}
{"value":9}
{"value":10}
{"value":11}
{"value":12}
{"value":13}
{"value":14}
...

Streamをゆっくり返したい場合はFlux.interval(Duration)とzipしたStreamを返します。

package com.example;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        return Flux.fromStream(stream).zipWith(Flux.interval(Duration.ofSeconds(1)))
                .map(tuple -> Collections.singletonMap("value", tuple.getT1() /* タプルの1つ目要素 = Stream<Integer>の要素 */));
    }
}

これで毎秒1件ずつゆっくりデータが返ってきます。

$ curl -v localhost:8080/stream -H 'Accept: application/stream+json'
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: application/stream+json
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/stream+json;charset=UTF-8
< 
{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}

POSTの例

次にPOSTの例も試します。

リクエストボディの文字列を大文字にして返すechoメソッドを作ります。

package com.example;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        return Flux.fromStream(stream).zipWith(Flux.interval(Duration.ofSeconds(1)))
                .map(tuple -> Collections.singletonMap("value", tuple.getT1()));
    }

    @PostMapping("/echo")
    Mono<String> echo(@RequestBody Mono<String> body) {
        return body.map(String::toUpperCase);
    }
}

普通のControllerと同じく、@RequestBodyでリクエストボディを受け取れます。Spring WebFluxではリクエストボディ(ここでは文字列)をMonoにラップして受け取ることで、非同期で処理をchain / composeすることができます。 (ちなみにMonoでラップせずStringで受けた場合はノンブロッキングで同期な処理になります。) この例ではリクエストボディを大文字に変換するmapの結果のMonoをそのまま返しています。Monoは1または0要素のPublisherです。

$ curl localhost:8080/echo -d hoge
HOGE

Monoの部分をFluxに変えてもOKです。一件しか扱わないことを前提の処理であればMonoを使った方が明示的です。 逆に複数件数のStreamを扱いたい場合はFluxにします。

次の例では@PostMappingをつけたstreamメソッドにでStreamをFluxで受け取り、キーvalueに対する値の数値を2倍にした値をキーfooに入れたMapに変換して返却しています。

package com.example;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        return Flux.fromStream(stream).zipWith(Flux.interval(Duration.ofSeconds(1)))
                .map(tuple -> Collections.singletonMap("value",
                        tuple.getT1()));
    }

    @PostMapping("/echo")
    Mono<String> echo(@RequestBody Mono<String> body) {
        return body.map(String::toUpperCase);
    }

    @PostMapping("/stream")
    Flux<Map<String, Integer>> stream(@RequestBody Flux<Map<String, Integer>> body) {
        return body.map(m -> Collections.singletonMap("double", m.get("value") * 2));
    }
}

JSON StreamでPOSTしてServer-Sent Eventで受信するようにContent-TypeヘッダとAcceptヘッダを指定します。

$ curl -v localhost:8080/stream -d '{"value":1}{"value":2}{"value":3}' -H 'Content-Type: application/stream+json'  -H 'Accept: text/event-stream'
> POST /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Content-Type: application/stream+json
> Accept: text/event-stream
> Content-Length: 33
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream
< 
data:{"double":2}

data:{"double":4}

data:{"double":6}

Router Functionsモデル

次にもう一つのプログラミングモデルであるRouter Functionsを試します。

Route FunctionsではPOJOに@RestController@GetMappingなどのアノテーションをつけてルーティングを定義する代わりに、 パスとハンドラー関数(ラムダ)の組み合わせでルーティングを定義します。

Spring Boot 2.0では現状、Router Functionsと@Controllerモデルは共存できないので(両方使うと@Controllerが無視される?)、先ほど作成したHelloControllerを削除してください。

Hello World

@Controllerの時と同じようにGET /でFluxのHello Worldを返すRoutingの定義を書きましょう。Spring Boot 2.0ではRouterFunction<ServerResponse>なBean定義があれば、それをRouter Functionsのルーティング定義とみなします。

まずは簡単に、Bean定義中にルーティング定義を直接書きます。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return RouterFunctions.route(RequestPredicates.GET("/"), req -> ServerResponse
                .ok().body(Flux.just("Hello", "World!"), String.class));
    }
}

個人的にはRouterFunctions.*RequestPredicates.*ServerResponse.*をstatic importするのが好みです。IntelliJ IDEAならOption+Enterで"Add static import for ..."を選択。

static importすると次のように書けます。

package com.example;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return route(GET("/"),
                req -> ok().body(Flux.just("Hello", "World!"), String.class));
    }
}

HelloWebFluxApplicationを再起動して、curllocalhost:8080にアクセスしてください。

$ curl -v localhost:8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/plain;charset=UTF-8
< 
Hello World

Router Functionsの場合、アノテーションの読み込み(= Reflection処理)がないため、原理的には@Controllerよりも若干速いはずです。

なお、Publisherではなく、ハードコード文字列など同期なレスポンスボディを返せば良い場合は、次のようにsyncBodyと言うショートカットメソッドも用意されています。

package com.example;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return route(GET("/"), req -> ok().syncBody("Hello World!"));
    }
}

もっというとbody(Publisher<T> publisher, T clazz)は次の処理のショートカットです。

package com.example;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return route(GET("/"), req -> ok().body(
                BodyInserters.fromPublisher(Flux.just("Hello", "World!"), String.class)));
    }
}

これもstatic importすると次のように書けます。

package com.example;

import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return route(GET("/"), req -> ok()
                .body(fromPublisher(Flux.just("Hello", "World!"), String.class)));
    }
}

レスポンスボディの指定をこだわる場合はBodyInsertersにいろいろメソッドが用意されています。

さて、Bean定義中にラムダでRouting定義を書ききっても良いですが、量が多くなると見通しが悪くなるので、通常はHandlerクラスを用意して、メソッド参照します。

src/main/java/com/example/HelloHandler.javaを作成して、Hello Worldの処理を移しましょう。

package com.example;

import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class HelloHandler {

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }
}

Bean定義を次のように変えます。

package com.example;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes(HelloHandler helloHandler) {
        return route(GET("/"), helloHandler::hello);
    }
}

リクエストの処理だけでなく、Routing定義もHandlerにもたせたい場合は次のようなroutesメソッドをHandler側に作るのがオススメです。

package com.example;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello);
    }

    Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }
}

Bean定義がHandlerのroutesを呼び出すだけです。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes(HelloHandler helloHandler) {
        return helloHandler.routes();
    }
}

このようにした場合、複数のHandlerのroutesを合成することもでき、見通しがよりよくなります。

    @Bean
    RouterFunction<ServerResponse> routes(HelloHandler helloHandler, FooHandler fooHandler) {
        return helloHandler.routes().and(fooHandler.routes());
    }

無限Stream

@Controllerの場合とほぼ同じですが、AcceptヘッダによるContent-Negotiationが(多分)なく、レスポンス作成時にcontentTypeメソッドでContent-Typeを明示します。

Routing定義はroutesメソッド内にaddRouteメソッドでchainできます。

package com.example;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import java.util.stream.Stream;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello)
                .andRoute(GET("/stream"), this::stream);
    }

    Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Integer> flux = Flux.fromStream(stream);
        return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(flux,
                Integer.class);
    }
}

Content-Typeを明示しているのでアクセスする際はAcceptヘッダ不要です。

$ curl -v localhost:8080/stream 
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/stream+json
< 
0
1
2
3
4
5
6
...

Map<String, Integer>などGenericsな型で返す場合は、BodyInserters.fromPublisher(P publisher, ResolvableType elementType)を使います。

ちょっと面倒に見えるかもしれません。

package com.example;

import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello).andRoute(GET("/stream"), this::stream);
    }

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    public Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Map<String, Integer>> flux = Flux.fromStream(stream)
                .map(i -> Collections.singletonMap("value", i));
        return ok().contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(fromPublisher(flux, ResolvableType.forClassWithGenerics(Map.class,
                        String.class, Integer.class)));
    }
}

HelloWebFluxApplicationクラスを再起動して、/streamにアクセスすると無限JSON Streamが返ります。

$ curl -v localhost:8080/stream 
> GET /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/stream+json
< 
{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}
{"value":8}
{"value":9}
{"value":10}
{"value":11}
{"value":12}
{"value":13}
{"value":14}
...

POSTの例

POSTの場合は、RequestPredicates.POSTを使ってルーティングを定義するのと、 リクエストボディをServerRequest.bodyToMonoあるいはServerRequest.bodyToFluxを使えば、これまで説明したことと特に変わりはありません。

package com.example;

import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello)
                .andRoute(GET("/stream"), this::stream)
                .andRoute(POST("/echo"), this::echo);
    }

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    public Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Map<String, Integer>> flux = Flux.fromStream(stream)
                .map(i -> Collections.singletonMap("value", i));
        return ok().contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(fromPublisher(flux, ResolvableType.forClassWithGenerics(Map.class,
                        String.class, Integer.class)));
    }

    public Mono<ServerResponse> echo(ServerRequest req) {
        Mono<String> body = req.bodyToMono(String.class).map(String::toUpperCase);
        return ok().body(body, String.class);
    }
}

POST /streamも同じですが、Request BodyをGenerics型のPublisherで受ける場合はServerRequest.bodyToFluxではなく、ServerRequest.bodyメソッドにBodyInsertersの反対のBodyExtractorsを渡します。

package com.example;

import static org.springframework.web.reactive.function.BodyExtractors.toFlux;
import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello)
                .andRoute(GET("/stream"), this::stream)
                .andRoute(POST("/echo"), this::echo)
                .andRoute(POST("/stream"), this::postStream);
    }

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    public Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Map<String, Integer>> flux = Flux.fromStream(stream)
                .map(i -> Collections.singletonMap("value", i));
        return ok().contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(fromPublisher(flux, ResolvableType.forClassWithGenerics(Map.class,
                        String.class, Integer.class)));
    }

    public Mono<ServerResponse> echo(ServerRequest req) {
        Mono<String> body = req.bodyToMono(String.class).map(String::toUpperCase);
        return ok().body(body, String.class);
    }

    public Mono<ServerResponse> postStream(ServerRequest req) {
        Flux<Map<String, Integer>> body = req.body(toFlux(ResolvableType
                .forClassWithGenerics(Map.class, String.class, Integer.class))); // BodyExtractors.toFluxをstatic import
        return ok().contentType(MediaType.TEXT_EVENT_STREAM)
                .body(fromPublisher(body
                        .map(m -> Collections.singletonMap("double", m.get("value") * 2)),
                        ResolvableType.forClassWithGenerics(Map.class, String.class,
                                Integer.class)));
    }
}

これでHelloController相当のHelloHandlerクラスが出来上がりました。

RequestPredicates.GETRequestPredicates.POST@GetMapping@PostMappingに相当しますが、HTTPメソッドに寄らない@RequestMappingに対応するのはRequestPredicates.pathです。

またこれらはFunctional Interfaceなので、次のようにラムダ式でリクエストのマッチングルールを任意で書くことができます。

.andRoute(req -> req.headers().header("X-Foo").contains("bar"), this::bar) // X-Fooヘッダがbarの場合にマッチ
.andRoute(req -> true, this::foo); // なんでもマッチ

このあたりがRouter Functionsの強力な点です。


本記事ではSpring WebFluxの簡単な使い方を紹介しました。

次回予告

  • はじめてのSpring WebFlux その1.5」ではSpring Bootを使わないRouter FunctionのBootstrapについて紹介します。
  • 「はじめてのSpring WebFlux その2」ではHTTPクライアントであるWebClientとReactiveテストサポートについて紹介します。
  • 「はじめてのSpring WebFlux その3」ではPubSubなメッセージングを使ったReactive Webアプリケーションを作る予定です。

繰り返しになりますが、2017/5/17のJava Day Tokyo 2017のD1-D5でSpring WebFluxに全体像について詳しくお話します。 http://www.oracle.co.jp/events/javaday/2017

追記 Java Day Tokyoの資料を公開しました


✒️️ Edit  ⏰ History  🗑 Delete