--- title: Echo Clientを実装して学ぶReactor NettyのTCP Client tags: ["Reactor", "Reactor Netty", "Netty", "Java"] categories: ["Programming", "Java", "reactor", "ipc", "netty"] date: 2018-01-05T19:43:12Z updated: 2018-01-17T15:32:05Z --- [前回](/entries/438)、Echo Serverを実装してReactor NettyでTCP Serverを作成する方法を学びました。 今回はクライアント編です。 **目次** ### 依存ライブラリの追加 依存ライブラリはServerと同じです。 Reactor 3.1.x系のバージョン管理は`reactor-bom`の`Bismuth`でメンテナンスされており、これを``に設定しておけば``内でのバージョン明示は不要です。 ```xml io.projectreactor.ipc reactor-netty io.projectreactor reactor-test test io.projectreactor reactor-bom Bismuth-SR4 pom import ``` ### Echo Serverを実装 Reactor Nettyの`TcpCliet`の使い方のテンプレートは次の通りで、`TcpServer`と基本的には同じです。 ``` java import org.reactivestreams.Publisher; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.tcp.TcpClient; import java.util.function.BiFunction; public class EchoClient { public static void main(String[] args) throws Exception { TcpClient client = TcpClient.create("localhost", 7777); client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { return null /* Responseが終了したら完了するPublisher */; } }); } } ``` ただし、Serverの場合、 `NettyInbound` = Request (サーバーから見てリクエスト受信) , `NettyOutbound` = Response (サーバーから見てレスポンス送信) だったのが、Clientの場合は、 `NettyInbound` = Response (クライアントから見てレスポンス受信), `NettyOutbound` = Request (クライアントから見てリクエスト送信) になります。 また、今回はEcho Clientとして、1リクエストを送ると1レスポンスが返って終了するという挙動を期待します。ですので、`startAndAwait`メソッドに渡すハンドラの返り値は Serverの時のように`Flux.never()`で終了しないストリームを返すのではなく、レスポンスを受け取ったらたら完了するストリームを返すようにします。 > サーバーにつなぎっぱなしのクライアントを実装したい場合は、終わらないストリームを返します。 まずは、リクエストを送信します。今回は1件だけデータを送ることを想定するので`Mono`でデータを表現します。 データの送信には`NettyOutbound.send`や`NettyOutbound.sendString`メソッドが用意されています。返り値は`NettyOutbound`で、この型は`Publisher`を継承しています。 ``` java client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return null /* Responseが終了したら完了するPublisher */; } }); ``` 当然、`subscribe`しないと実際にストリームは流れませんが、`startAndAwait`に渡したハンドラの実行結果の`Publisher`は自動で`subscribe`されます。 すなわち、次のように書くと、サーバーでデータを送信し、送信完了すると終了するクライアント処理ができます。 ``` java client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send; } }); ``` 実行すると次のようなログが出力されます。 ``` [DEBUG] (main) Using Console logging [DEBUG] (main) Default epoll support : false [DEBUG] (main) New tcp client pool for localhost:7777 [DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1} [DEBUG] (reactor-tcp-nio-4) Created [id: 0xcc0c8586], now 1 active connections [DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)} [DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] [DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@3d20d50c [DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable [TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Pending write size = 12 [DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] User Handler requesting close connection [TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel [ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777 [TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated] [DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@673e0aa6 [DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] ``` Echo Server側を見ると`Hello World!`が出力されるでしょう。 > ちなみに、`startAndAwait`メソッドは名前の通りブロッキングなメソッドです。ハンドラの実行結果の`Publisher`を`subscribe`して完了するまでブロックします。自分で制御したい場合は次のように書けます。 > > ``` java > Mono handler = client.newHandler(new BiFunction>() { /* ... */ }); > handler.subscribe(); > ``` サーバーからのレスポンスも取得したいので、`send`が終わったらデータの受信する処理(`inbond.receive()`)をつなげます。処理をつなげる場合は`then`メソッドが使えます。 ``` java client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send.then(inbound.receive().then()); } }); ``` `NettyOutbound.then`の引数は`Publisher`でないといけないので、`receive()`の結果に対して更に`then()`を実行します。 受信したデータを`asString()`メソッドで`Flux`に変換し、標準出力に表示してみます。 ``` java client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send.then(inbound.receive() .asString() .doOnNext(s -> System.out.println("Received => " + s)) .then()); } }); ``` 実行すると次のようなログが出力されます。 ``` [DEBUG] (main) Using Console logging [DEBUG] (main) Default epoll support : false [DEBUG] (main) New tcp client pool for localhost:7777 [DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1} [DEBUG] (reactor-tcp-nio-4) Created [id: 0x54e9a483], now 1 active connections [DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)} [DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] [DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@34d84ada [DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable [TRACE] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Pending write size = 12 [DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false] [ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777 Received => Hello World! ``` 一見うまくいっているように見えますが、実は問題があります。プログラムが終了していません。 わかりやすいように`log`メソッドを追加します。 ``` java client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send.then(inbound.receive() .asString() .log() // added .doOnNext(s -> System.out.println("Received => " + s)) .then()); } }); ``` この状態で実行すると、次のようなログが出力されます。 ``` [DEBUG] (main) Using Console logging [DEBUG] (main) Default epoll support : false [DEBUG] (main) New tcp client pool for localhost:7777 [DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1} [DEBUG] (reactor-tcp-nio-4) Created [id: 0xd4aa0cdf], now 1 active connections [DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)} [DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] [DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@6ffe96da [DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable [TRACE] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Pending write size = 12 [DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false] [ INFO] (reactor-tcp-nio-4) onSubscribe(FluxMap.MapSubscriber) [ INFO] (reactor-tcp-nio-4) request(unbounded) [ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777 [ INFO] (reactor-tcp-nio-4) onNext(Hello World!) Received => Hello World! ``` 確かにデータは受信できているのですが、`onComplete`が呼ばれていません。そのため、ハンドラーの返り値のストリームはまだ受信するデータがあるかもしれないから終わらない、という状態です。 どのタイミングで終了したいかはプロトコル次第ですが、今回は一件データが取得できればそこで終了したいです。 なので、`Flux`のデータを一件だけ受け取ったら終了する`Mono`に変換すれば良いです。この用途にぴったりのメソッドが[`Flux.next()`](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--)です。 ![](https://raw.githubusercontent.com/reactor/reactor-core/v3.1.2.RELEASE/src/docs/marble/next.png) `asString()`の後に`next()`を追加します。 ``` java client.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send.then(inbound.receive() .asString() .next() // added .log() .doOnNext(s -> System.out.println("Received => " + s)) .then()); } }); ``` この状態で実行すると、次のようなログが出力されます。 ``` [DEBUG] (main) Using Console logging [DEBUG] (main) Default epoll support : false [DEBUG] (main) New tcp client pool for localhost:7777 [DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1} [DEBUG] (reactor-tcp-nio-4) Created [id: 0x778cbeac], now 1 active connections [DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)} [DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] [DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@34d84ada [DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable [TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Pending write size = 12 [DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false] [ INFO] (reactor-tcp-nio-4) onSubscribe(MonoNext.NextSubscriber) [ INFO] (reactor-tcp-nio-4) request(unbounded) [ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777 [ INFO] (reactor-tcp-nio-4) onNext(Hello World!) Received => Hello World! [ INFO] (reactor-tcp-nio-4) onComplete() [DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] User Handler requesting close connection [TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel [TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated] [DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@685b8b20 [DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] ``` `onComplete`が呼ばれ、無事プログラムが終了しました。 ラムダ式を使えば次のようにスッキリします。 ``` java client.startAndAwait((inbound, outbound) -> { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send.then(inbound.receive() .asString() .next() .log() .doOnNext(s -> System.out.println("Received => " + s)) .then()); }); ``` ほぼOKなのですが、TCP Clientを作ったならば、返り値も`Mono`で受けたいです。この場合は、次のように`Mono.create`メソッドを使用します。 ``` java Mono result = Mono.create(sink -> client.startAndAwait((inbound, outbound) -> { Mono input = Mono.just("Hello World!"); NettyOutbound send = outbound.sendString(input); return send.then(inbound.receive() .asString() .next() .log() .doOnNext(sink::success) .doOnError(sink::error) .then()); })); ``` まとめると、`EchoClient`クラスを次のように書けます。 ``` java import reactor.core.publisher.Mono; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.tcp.TcpClient; public class EchoClient { private final TcpClient client; public EchoClient(String host, int port) { this.client = TcpClient.create(host, port); } public Mono echo(Mono input) { return Mono.create(sink -> client.startAndAwait((inbound, outbound) -> outbound .sendString(input) .then(inbound.receive() .asString() .next() .doOnNext(sink::success) .doOnError(sink::error) .then()))); } } ``` これでNon-BlockingなEcho Clientが出来上がりました。 と、思ったけれど、よく見ると`startAndAwait`はブロッキングなメソッドなので、このままではダメですね。`Mono.create`の中では`start`で止めて、後始末をコールバックで書くのが良いでしょう。 ``` java import reactor.core.publisher.Mono; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.tcp.BlockingNettyContext; import reactor.ipc.netty.tcp.TcpClient; public class EchoClient { private final TcpClient client; public EchoClient(String host, int port) { this.client = TcpClient.create(host, port); } public Mono echo(Mono input) { return Mono.create(sink -> { BlockingNettyContext context = client.start((inbound, outbound) -> outbound .sendString(input) .then(inbound.receive() .asString() .next() .doOnNext(sink::success) .doOnError(sink::error) .then())); sink.onDispose(() -> context.getContext().dispose()); }); } } ``` これで`EchoClient`はOKでしょう。次のように利用できます。 ``` java EchoClient echoClient = new EchoClient("localhost", 7777); Mono result = echoClient.echo(Mono.just("Hello World!")); result.log() .doOnNext(s -> System.out.println("Received => " + s)) .subscribe(); // mainメソッドで実行する場合はThread.sleep(1000);などして、プログラムが先に終了しないようにブロックする必要あり。 ``` ### 無限ストリームに対応 では、この`EchoClient`で無限ストリームに対応するために、`Flux`に対応するとどうなるでしょうか。 ``` java public Flux echo(Flux input) { // .... } ``` 次の呼び出しに対して、 ``` java Flux input = Flux.interval(Duration.ofMillis(500)).map(i -> "Hi " + i); Flux result = echoClient.echo(input); result.take(10) .log() .doOnNext(s -> System.out.println("Received => " + s)) .subscribe(); ``` 次のように出力して欲しいです。 ``` Received => Hi 0 Received => Hi 1 Received => Hi 2 Received => Hi 3 Received => Hi 4 Received => Hi 5 Received => Hi 6 Received => Hi 7 Received => Hi 8 Received => Hi 9 ``` 考えてみてください。