Reactor 3.1, Reactor Netty 0.7が正式にリリースされたので、TCP Server版Hello WorldであるEcho ServerをReactor Nettyで実装してみます。
Note
2018-01-04 Bismuth-SR4版 (Reactor 3.1.2, Reactor Netty 0.7.2)にアップデート
目次
依存ライブラリの追加
Reactor 3.1.x系のバージョン管理はreactor-bomのBismuthでメンテナンスされています。これを<dependencyManagement>に設定しておけば<dependencies>内でのバージョン明示は不要です。
<dependencies>
<dependency>
<groupId>io.projectreactor.ipc</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Echo Serverを実装
Reactor NettyのTcpServerの使い方のテンプレートは次の通りです。
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpServer;
import java.util.Optional;
import java.util.function.BiFunction;
public class EchoServer {
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
// ここに入力 => 出力の処理を書く
return Flux.never();
}
});
}
}
サーバーサイドプログラミングに慣れていれば、"ああメッセージの処理ハンドラをラムダで書けばいいんだな"と思うかもしれません。
これはある意味合っているのですが、通常の"処理ハンドラ"と大きく違うのは、このハンドラ(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>)が扱うのは1件のメッセージではなく、無限ストリームであるという点です。
引数のNettyInbound/NettyOutboundはServletのHttpServletRequest/HttpServletResponseのように1リクエスト/レスポンスではなく、このサーバーへの入出力全体を表します。なので、この処理ハンドラの返り値が出力ストリームになるわけでもありません。処理が終わらないようにFlux.never()(データは流れないけど終わらないストリーム)を返します。
入力として扱うデータを文字列に絞りましょう。この場合、入力となる文字列の無限ストリームをFluxで次のように取得できます。
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Flux<String> input = inbound.receive().asString();
return Flux.never();
}
});
この入力ストリームを加工して出力ストリームに送ります。
ここで通常のサーバープログラミングのように、1件ずつ処理したいと考えます。Fluxクラスにはデータ1件ごとのコールバックメソッドとしてdoOnNext(Consumer<T>)が用意されています。これを使うとサーバーサイドのコードは次のようになります。(わかりやすいようにラムダ式を使っていません)
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Flux<String> input = inbound.receive().asString();
input.doOnNext(new Consumer<String>() {
@Override
public void accept(String message) {
// 文字列メッセージ一件ずつの処理
}
}).subscribe();
return Flux.never();
}
});
注意点として、入力ストリームのFluxはsubscribeメソッドを呼ばないとストリームにデータが流れません。
出力ストリームにこのメッセージを送信するコードは次の通りです。
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Flux<String> input = inbound.receive().asString();
input.doOnNext(new Consumer<String>() {
@Override
public void accept(String message) {
outbound.sendString(Mono.just(message))
.then()
.subscribe();
}
}).subscribe();
return Flux.never();
}
});
NettyOutbound.sendStringは引数がPublisher<String>なので、StringをメッセージをMono.justで包んでいます。この返り値が出力ストリームになります。この型もNettyOutboundです。これはPublisher<Void>を継承しています。
入力ストリーム同様に出力ストリームもsubscribeしないとデータが流れません。
ここではthenメソッドでMonoに変換してsubscribeします。
ここまででEcho Serverができました。mainメソッドを実行してサーバーを起動時、telnetでTCPサーバーにアクセスします。
$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
hoge ==> 出力
foo <== 入力
foo ==> 出力
bar <== 入力
bar ==> 出力
^]
telnet> quit
Connection closed.
ncコマンドでも動作確認できます。
$ echo -n 'Hello World!' | nc localhost 7777
Hello World!
上記のプログラムはラムダ式を使うと次のように書けます。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.doOnNext(message -> outbound.sendString(Mono.just(message))
.then()
.subscribe()).subscribe();
return Flux.never();
});
}
実際にメッセージが出力ストリームに流れているかどうかを確認したい場合は、次のように出力メッセージのストリームにlogメソッドを追加します。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.doOnNext(message -> outbound.sendString(Mono.just(message).log() /*追加*/)
.then()
.subscribe()).subscribe();
return Flux.never();
});
}
これで再起動し、再度メッセージを送ると、次のようなログを確認できます。onNextで出力メッセージが流れていることがわかります。
[ INFO] (reactor-tcp-nio-2) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (reactor-tcp-nio-2) | request(32)
[ INFO] (reactor-tcp-nio-2) | onNext(Hello World!)
[TRACE] (reactor-tcp-nio-2) [id: 0x8984720e, L:/0:0:0:0:0:0:0:1:7777 - R:/0:0:0:0:0:0:0:1:52269] Pending write size = 12
[ INFO] (reactor-tcp-nio-2) | request(1)
[ INFO] (reactor-tcp-nio-2) | onComplete()
出力側のsubscribeを削除するとonNextログがでないことを確認できるでしょう。
少し、リファクタしてみます。
上記のコードは入力ストリームのdoOnNextコールバック内で出力ストリームを作成したため、両方のsubscribeが必要でした。
ここで考え方を変えて、入力ストリームを変換して出力ストリームを作成するようにすると、作成されたストリームをsubscribeするだけで入出力両方にデータが流れます。変換したストリームと元のストリームを合流させるにはflatMapを使います。flatMapを使えばEchoServerは次のように書き換えられます。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.flatMap(message -> outbound.sendString(Mono.just(message)))
.subscribe();
return Flux.never();
});
}
見た目がすっきりしました。
ちなみに、このコードで
flatMapをmapに変えてもコンパイルは通りますが、データは流れなくなります。この違い、とても重要なので理解してください。
Echo Serverのバッファリング処理
ただのEcho Serverだとつまらないので、もう少しストリーム処理っぽい内容に変えてみます。
NettyOutbound.sendStringの引数はPublisher<String>でした。つまりこのメソッドはメッセージを1件送るためのメソッドではなく、メッセージのストリームを送るメソッドです。
ここでは、EchoServerを3件ごとバッファリングして一気に出力するように書き換えます。Flux<String>のストリームを3件ずつの塊ストリームであるFlux<Flux<String>>に変換するにはwindow(int)メソッドを使用します。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.window(3)
.flatMap(messages -> outbound.sendString(messages))
.subscribe();
return Flux.never();
});
}
これで3件ずつまとめてデータを扱うEchoServerになりました。telnetで動作確認してみます。
$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
foo <== 入力
bar <== 入力
hoge ==> 出力
foo ==> 出力
bar ==> 出力
^]
telnet> quit
Connection closed.
前の例と異なり、3件単位でデータが流れていることがわかります。
Echo Serverを実装を通じてReactor Nettyを使ったTCP Serverの作り方及び、ストリームを扱うプログラミングの考え方を簡単に学びました。
Reactorを使うことでこれまで使ってこなかったストリーム脳で考えないといけない場面が増えてくるので、この記事がとっかかりになればと思います。