Reactor 3.1, Reactor Netty 0.7が正式にリリースされたので、TCP Server版Hello WorldであるEcho ServerをReactor Nettyで実装してみます。
ℹ️ 2018-01-04 Bismuth-SR4版 (Reactor 3.1.2, Reactor Netty 0.7.2)にアップデート
目次
依存ライブラリの追加
Reactor 3.1.x系のバージョン管理はreactor-bom
のBismuth
でメンテナンスされています。これを<dependencyManagement>
に設定しておけば<dependencies>
内でのバージョン明示は不要です。
<dependencies>
<dependency>
<groupId>io.projectreactor.ipc</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Echo Serverを実装
Reactor NettyのTcpServer
の使い方のテンプレートは次の通りです。
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpServer;
import java.util.Optional;
import java.util.function.BiFunction;
public class EchoServer {
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
// ここに入力 => 出力の処理を書く
return Flux.never();
}
});
}
}
サーバーサイドプログラミングに慣れていれば、"ああメッセージの処理ハンドラをラムダで書けばいいんだな"と思うかもしれません。
これはある意味合っているのですが、通常の"処理ハンドラ"と大きく違うのは、このハンドラ(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>
)が扱うのは1件のメッセージではなく、無限ストリームであるという点です。
引数のNettyInbound
/NettyOutbound
はServletのHttpServletRequest
/HttpServletResponse
のように1リクエスト/レスポンスではなく、このサーバーへの入出力全体を表します。なので、この処理ハンドラの返り値が出力ストリームになるわけでもありません。処理が終わらないようにFlux.never()
(データは流れないけど終わらないストリーム)を返します。
入力として扱うデータを文字列に絞りましょう。この場合、入力となる文字列の無限ストリームをFlux
で次のように取得できます。
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Flux<String> input = inbound.receive().asString();
return Flux.never();
}
});
この入力ストリームを加工して出力ストリームに送ります。
ここで通常のサーバープログラミングのように、1件ずつ処理したいと考えます。Flux
クラスにはデータ1件ごとのコールバックメソッドとしてdoOnNext(Consumer<T>)
が用意されています。これを使うとサーバーサイドのコードは次のようになります。(わかりやすいようにラムダ式を使っていません)
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Flux<String> input = inbound.receive().asString();
input.doOnNext(new Consumer<String>() {
@Override
public void accept(String message) {
// 文字列メッセージ一件ずつの処理
}
}).subscribe();
return Flux.never();
}
});
注意点として、入力ストリームのFlux
はsubscribe
メソッドを呼ばないとストリームにデータが流れません。
出力ストリームにこのメッセージを送信するコードは次の通りです。
tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
Flux<String> input = inbound.receive().asString();
input.doOnNext(new Consumer<String>() {
@Override
public void accept(String message) {
outbound.sendString(Mono.just(message))
.then()
.subscribe();
}
}).subscribe();
return Flux.never();
}
});
NettyOutbound.sendString
は引数がPublisher<String>
なので、String
をメッセージをMono.just
で包んでいます。この返り値が出力ストリームになります。この型もNettyOutbound
です。これはPublisher<Void>
を継承しています。
入力ストリーム同様に出力ストリームもsubscribe
しないとデータが流れません。
ここではthen
メソッドでMono
に変換してsubscribe
します。
ここまででEcho Serverができました。main
メソッドを実行してサーバーを起動時、telnet
でTCPサーバーにアクセスします。
$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
hoge ==> 出力
foo <== 入力
foo ==> 出力
bar <== 入力
bar ==> 出力
^]
telnet> quit
Connection closed.
nc
コマンドでも動作確認できます。
$ echo -n 'Hello World!' | nc localhost 7777
Hello World!
上記のプログラムはラムダ式を使うと次のように書けます。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.doOnNext(message -> outbound.sendString(Mono.just(message))
.then()
.subscribe()).subscribe();
return Flux.never();
});
}
実際にメッセージが出力ストリームに流れているかどうかを確認したい場合は、次のように出力メッセージのストリームにlog
メソッドを追加します。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.doOnNext(message -> outbound.sendString(Mono.just(message).log() /*追加*/)
.then()
.subscribe()).subscribe();
return Flux.never();
});
}
これで再起動し、再度メッセージを送ると、次のようなログを確認できます。onNext
で出力メッセージが流れていることがわかります。
[ INFO] (reactor-tcp-nio-2) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (reactor-tcp-nio-2) | request(32)
[ INFO] (reactor-tcp-nio-2) | onNext(Hello World!)
[TRACE] (reactor-tcp-nio-2) [id: 0x8984720e, L:/0:0:0:0:0:0:0:1:7777 - R:/0:0:0:0:0:0:0:1:52269] Pending write size = 12
[ INFO] (reactor-tcp-nio-2) | request(1)
[ INFO] (reactor-tcp-nio-2) | onComplete()
出力側のsubscribe
を削除するとonNext
ログがでないことを確認できるでしょう。
少し、リファクタしてみます。
上記のコードは入力ストリームのdoOnNext
コールバック内で出力ストリームを作成したため、両方のsubscribe
が必要でした。
ここで考え方を変えて、入力ストリームを変換して出力ストリームを作成するようにすると、作成されたストリームをsubscribe
するだけで入出力両方にデータが流れます。変換したストリームと元のストリームを合流させるにはflatMap
を使います。flatMap
を使えばEchoServerは次のように書き換えられます。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.flatMap(message -> outbound.sendString(Mono.just(message)))
.subscribe();
return Flux.never();
});
}
見た目がすっきりしました。
ちなみに、このコードで
flatMap
をmap
に変えてもコンパイルは通りますが、データは流れなくなります。この違い、とても重要なので理解してください。
Echo Serverのバッファリング処理
ただのEcho Serverだとつまらないので、もう少しストリーム処理っぽい内容に変えてみます。
NettyOutbound.sendString
の引数はPublisher<String>
でした。つまりこのメソッドはメッセージを1件送るためのメソッドではなく、メッセージのストリームを送るメソッドです。
ここでは、EchoServerを3件ごとバッファリングして一気に出力するように書き換えます。Flux<String>
のストリームを3件ずつの塊ストリームであるFlux<Flux<String>>
に変換するにはwindow(int)
メソッドを使用します。
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
tcpServer.startAndAwait((inbound, outbound) -> {
Flux<String> input = inbound.receive().asString();
input.window(3)
.flatMap(messages -> outbound.sendString(messages))
.subscribe();
return Flux.never();
});
}
これで3件ずつまとめてデータを扱うEchoServerになりました。
telnet
で動作確認してみます。
$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
foo <== 入力
bar <== 入力
hoge ==> 出力
foo ==> 出力
bar ==> 出力
^]
telnet> quit
Connection closed.
前の例と異なり、3件単位でデータが流れていることがわかります。
Echo Serverを実装を通じてReactor Nettyを使ったTCP Serverの作り方及び、ストリームを扱うプログラミングの考え方を簡単に学びました。 Reactorを使うことでこれまで使ってこなかったストリーム脳で考えないといけない場面が増えてくるので、この記事がとっかかりになればと思います。