前回、Echo Serverを実装してReactor NettyでTCP Serverを作成する方法を学びました。 今回はクライアント編です。
目次
依存ライブラリの追加
依存ライブラリはServerと同じです。
Reactor 3.1.x系のバージョン管理はreactor-bom
のBismuth
でメンテナンスされており、これを<dependencyManagement>
に設定しておけば<dependencies>
内でのバージョン明示は不要です。
<dependencies>
<dependency>
<groupId>io.projectreactor.ipc</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Echo Serverを実装
Reactor NettyのTcpCliet
の使い方のテンプレートは次の通りで、TcpServer
と基本的には同じです。
import org.reactivestreams.Publisher;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;
import java.util.function.BiFunction;
public class EchoClient {
public static void main(String[] args) throws Exception {
TcpClient client = TcpClient.create("localhost", 7777);
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
return null /* Responseが終了したら完了するPublisher<Void> */;
}
});
}
}
ただし、Serverの場合、
NettyInbound
= Request (サーバーから見てリクエスト受信) , NettyOutbound
= Response (サーバーから見てレスポンス送信)
だったのが、Clientの場合は、
NettyInbound
= Response (クライアントから見てレスポンス受信), NettyOutbound
= Request (クライアントから見てリクエスト送信)
になります。
また、今回はEcho Clientとして、1リクエストを送ると1レスポンスが返って終了するという挙動を期待します。ですので、startAndAwait
メソッドに渡すハンドラの返り値は
Serverの時のようにFlux.never()
で終了しないストリームを返すのではなく、レスポンスを受け取ったらたら完了するストリームを返すようにします。
サーバーにつなぎっぱなしのクライアントを実装したい場合は、終わらないストリームを返します。
まずは、リクエストを送信します。今回は1件だけデータを送ることを想定するのでMono<String>
でデータを表現します。
データの送信にはNettyOutbound.send
やNettyOutbound.sendString
メソッドが用意されています。返り値はNettyOutbound
で、この型はPublisher<Void>
を継承しています。
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return null /* Responseが終了したら完了するPublisher<Void> */;
}
});
当然、subscribe
しないと実際にストリームは流れませんが、startAndAwait
に渡したハンドラの実行結果のPublisher<Void>
は自動でsubscribe
されます。
すなわち、次のように書くと、サーバーでデータを送信し、送信完了すると終了するクライアント処理ができます。
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send;
}
});
実行すると次のようなログが出力されます。
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0xcc0c8586], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@3d20d50c
[DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] User Handler requesting close connection
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@673e0aa6
[DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777]
Echo Server側を見るとHello World!
が出力されるでしょう。
ちなみに、
startAndAwait
メソッドは名前の通りブロッキングなメソッドです。ハンドラの実行結果のPublisher
をsubscribe
して完了するまでブロックします。自分で制御したい場合は次のように書けます。Mono<? extends NettyContext> handler = client.newHandler(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() { /* ... */ }); handler.subscribe();
サーバーからのレスポンスも取得したいので、send
が終わったらデータの受信する処理(inbond.receive()
)をつなげます。処理をつなげる場合はthen
メソッドが使えます。
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send.then(inbound.receive().then());
}
});
NettyOutbound.then
の引数はPublisher<Void>
でないといけないので、receive()
の結果に対して更にthen()
を実行します。
受信したデータをasString()
メソッドでFlux<String>
に変換し、標準出力に表示してみます。
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send.then(inbound.receive()
.asString()
.doOnNext(s -> System.out.println("Received => " + s))
.then());
}
});
実行すると次のようなログが出力されます。
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0x54e9a483], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@34d84ada
[DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
Received => Hello World!
一見うまくいっているように見えますが、実は問題があります。プログラムが終了していません。
わかりやすいようにlog
メソッドを追加します。
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send.then(inbound.receive()
.asString()
.log() // added
.doOnNext(s -> System.out.println("Received => " + s))
.then());
}
});
この状態で実行すると、次のようなログが出力されます。
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0xd4aa0cdf], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@6ffe96da
[DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) onSubscribe(FluxMap.MapSubscriber)
[ INFO] (reactor-tcp-nio-4) request(unbounded)
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[ INFO] (reactor-tcp-nio-4) onNext(Hello World!)
Received => Hello World!
確かにデータは受信できているのですが、onComplete
が呼ばれていません。そのため、ハンドラーの返り値のストリームはまだ受信するデータがあるかもしれないから終わらない、という状態です。
どのタイミングで終了したいかはプロトコル次第ですが、今回は一件データが取得できればそこで終了したいです。
なので、Flux
のデータを一件だけ受け取ったら終了するMono
に変換すれば良いです。この用途にぴったりのメソッドがFlux.next()
です。
asString()
の後にnext()
を追加します。
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send.then(inbound.receive()
.asString()
.next() // added
.log()
.doOnNext(s -> System.out.println("Received => " + s))
.then());
}
});
この状態で実行すると、次のようなログが出力されます。
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0x778cbeac], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@34d84ada
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) onSubscribe(MonoNext.NextSubscriber)
[ INFO] (reactor-tcp-nio-4) request(unbounded)
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[ INFO] (reactor-tcp-nio-4) onNext(Hello World!)
Received => Hello World!
[ INFO] (reactor-tcp-nio-4) onComplete()
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] User Handler requesting close connection
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@685b8b20
[DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777]
onComplete
が呼ばれ、無事プログラムが終了しました。
ラムダ式を使えば次のようにスッキリします。
client.startAndAwait((inbound, outbound) -> {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send.then(inbound.receive()
.asString()
.next()
.log()
.doOnNext(s -> System.out.println("Received => " + s))
.then());
});
ほぼOKなのですが、TCP Clientを作ったならば、返り値もMono<String>
で受けたいです。この場合は、次のようにMono.create
メソッドを使用します。
Mono<String> result = Mono.create(sink ->
client.startAndAwait((inbound, outbound) -> {
Mono<String> input = Mono.just("Hello World!");
NettyOutbound send = outbound.sendString(input);
return send.then(inbound.receive()
.asString()
.next()
.log()
.doOnNext(sink::success)
.doOnError(sink::error)
.then());
}));
まとめると、EchoClient
クラスを次のように書けます。
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;
public class EchoClient {
private final TcpClient client;
public EchoClient(String host, int port) {
this.client = TcpClient.create(host, port);
}
public Mono<String> echo(Mono<String> input) {
return Mono.create(sink ->
client.startAndAwait((inbound, outbound) -> outbound
.sendString(input)
.then(inbound.receive()
.asString()
.next()
.doOnNext(sink::success)
.doOnError(sink::error)
.then())));
}
}
これでNon-BlockingなEcho Clientが出来上がりました。
と、思ったけれど、よく見るとstartAndAwait
はブロッキングなメソッドなので、このままではダメですね。Mono.create
の中ではstart
で止めて、後始末をコールバックで書くのが良いでしょう。
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.BlockingNettyContext;
import reactor.ipc.netty.tcp.TcpClient;
public class EchoClient {
private final TcpClient client;
public EchoClient(String host, int port) {
this.client = TcpClient.create(host, port);
}
public Mono<String> echo(Mono<String> input) {
return Mono.create(sink -> {
BlockingNettyContext context = client.start((inbound, outbound) -> outbound
.sendString(input)
.then(inbound.receive()
.asString()
.next()
.doOnNext(sink::success)
.doOnError(sink::error)
.then()));
sink.onDispose(() -> context.getContext().dispose());
});
}
}
これでEchoClient
はOKでしょう。次のように利用できます。
EchoClient echoClient = new EchoClient("localhost", 7777);
Mono<String> result = echoClient.echo(Mono.just("Hello World!"));
result.log()
.doOnNext(s -> System.out.println("Received => " + s))
.subscribe();
// mainメソッドで実行する場合はThread.sleep(1000);などして、プログラムが先に終了しないようにブロックする必要あり。
無限ストリームに対応
では、このEchoClient
で無限ストリームに対応するために、Flux
に対応するとどうなるでしょうか。
public Flux<String> echo(Flux<String> input) {
// ....
}
次の呼び出しに対して、
Flux<String> input = Flux.interval(Duration.ofMillis(500)).map(i -> "Hi " + i);
Flux<String> result = echoClient.echo(input);
result.take(10)
.log()
.doOnNext(s -> System.out.println("Received => " + s))
.subscribe();
次のように出力して欲しいです。
Received => Hi 0
Received => Hi 1
Received => Hi 2
Received => Hi 3
Received => Hi 4
Received => Hi 5
Received => Hi 6
Received => Hi 7
Received => Hi 8
Received => Hi 9
考えてみてください。