@making's memo

All Categories All Tags Premium (Beta)


Echo Serverを実装して学ぶReactor Nettyによるストリーム処理

Edit History

Reactor 3.1.0, Reactor Netty 0.7.0が正式にリリースされたので、TCP Server版Hello WorldであるEcho ServerをReactor Nettyで実装してみます。

目次

依存ライブラリの追加

Reactor 3.1.0系のバージョン管理はreactor-bomBismuthでメンテナンスされています。これを<dependencyManagement>に設定しておけば<dependencies>内でのバージョン明示は不要です。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

Echo Serverを実装

Reactor NettyのTcpServerの使い方のテンプレートは次の通りです。

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpServer;

import java.util.Optional;
import java.util.function.BiFunction;

public class EchoServer {
    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                // ここに入力 => 出力の処理を書く
                return Flux.never();
            }
        });
    }
}

サーバーサイドプログラミングに慣れていれば、"ああメッセージの処理ハンドラをラムダで書けばいいんだな"と思うかもしれません。
これはある意味合っているのですが、通常の"処理ハンドラ"と大きく違うのは、このハンドラ(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>)が扱うのは1件のメッセージではなく、無限ストリームであるという点です。
引数のNettyInbound/NettyOutboundはServletのHttpServletRequest/HttpServletResponseのように1リクエスト/レスポンスではなく、このサーバーへの入出力全体を表します。なので、この処理ハンドラの返り値が出力ストリームになるわけでもありません。処理が終わらないようにFlux.never()(データは流れないけど終わらないストリーム)を返します。

入力として扱うデータを文字列に絞りましょう。この場合、入力となる文字列の無限ストリームをFluxで次のように取得できます。

        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                Flux<String> input = inbound.receive().asString();

                return Flux.never();
            }
        });

この入力ストリームを加工して出力ストリームに送ります。
ここで通常のサーバープログラミングのように、1件ずつ処理したいと考えます。Fluxクラスにはデータ1件ごとのコールバックメソッドとしてdoOnNext(Consumer<T>)が用意されています。これを使うとサーバーサイドのコードは次のようになります。(わかりやすいようにラムダ式を使っていません)

        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                Flux<String> input = inbound.receive().asString();

                input.doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String message) {
                      // 文字列メッセージ一件ずつの処理
                    }
                }).subscribe();

                return Flux.never();
            }
        });

注意点として、入力ストリームのFluxsubscribeメソッドを呼ばないとストリームにデータが流れません。

出力ストリームにこのメッセージを送信するコードは次の通りです。

        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                Flux<String> input = inbound.receive().asString();

                input.doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String message) {
                        outbound.sendString(Mono.just(message))
                                .then()
                                .subscribe();
                    }
                }).subscribe();

                return Flux.never();
            }
        });

NettyOutbound.sendStringは引数がPublisher<String>なので、StringをメッセージをMono.justで包んでいます。この返り値が出力ストリームになります。この型もNettyOutboundです。これはPublisher<Void>を継承しています。

入力ストリーム同様に出力ストリームもsubscribeしないとデータが流れません。
ここではthenメソッドでMonoに変換してsubscribeします。

ここまででEcho Serverができました。mainメソッドを実行してサーバーを起動時、telnetでTCPサーバーにアクセスします。

$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
hoge ==> 出力
foo <== 入力
foo ==> 出力
bar <== 入力
bar ==> 出力
^]
telnet> quit
Connection closed.

ncコマンドでも動作確認できます。

$ echo -n 'Hello World!' | nc localhost 7777
Hello World!

上記のプログラムはラムダ式を使うと次のように書けます。

    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.doOnNext(message -> outbound.sendString(Mono.just(message))
                    .then()
                    .subscribe()).subscribe();
            return Flux.never();
        });
    }

実際にメッセージが出力ストリームに流れているかどうかを確認したい場合は、次のように出力メッセージのストリームにlogメソッドを追加します。

    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.doOnNext(message -> outbound.sendString(Mono.just(message).log() /*追加*/)
                    .then()
                    .subscribe()).subscribe();
            return Flux.never();
        });
    }

これで再起動し、再度メッセージを送ると、次のようなログを確認できます。onNextで出力メッセージが流れていることがわかります。

[ INFO] (reactor-tcp-nio-2) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (reactor-tcp-nio-2) | request(32)
[ INFO] (reactor-tcp-nio-2) | onNext(Hello World!)
[TRACE] (reactor-tcp-nio-2) [id: 0x8984720e, L:/0:0:0:0:0:0:0:1:7777 - R:/0:0:0:0:0:0:0:1:52269] Pending write size = 12
[ INFO] (reactor-tcp-nio-2) | request(1)
[ INFO] (reactor-tcp-nio-2) | onComplete()

出力側のsubscribeを削除するとonNextログがでないことを確認できるでしょう。

少し、リファクタしてみます。

上記のコードは入力ストリームのdoOnNextコールバック内で出力ストリームを作成したため、両方のsubscribeが必要でした。
ここで考え方を変えて、入力ストリームを変換して出力ストリームを作成するようにすると、作成されたストリームをsubscribeするだけで入出力両方にデータが流れます。変換したストリームと元のストリームを合流させるにはflatMapを使います。flatMapを使えばEchoServerは次のように書き換えられます。

    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.flatMap(message -> outbound.sendString(Mono.just(message)))
                    .subscribe();
            return Flux.never();
        });
    }

見た目がすっきりしました。

ちなみに、このコードでflatMapmapに変えてもコンパイルは通りますが、データは流れなくなります。この違い、とても重要なので理解してください。

Echo Serverのバッファリング処理

ただのEcho Serverだとつまらないので、もう少しストリーム処理っぽい内容に変えてみます。

NettyOutbound.sendStringの引数はPublisher<String>でした。つまりこのメソッドはメッセージを1件送るためのメソッドではなく、メッセージのストリームを送るメソッドです。

ここでは、EchoServerを3件ごとバッファリングして一気に出力するように書き換えます。Flux<String>のストリームを3件ずつの塊ストリームであるFlux<Flux<String>>に変換するにはwindow(int)メソッドを使用します。

    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.window(3)
                    .flatMap(messages -> outbound.sendString(messages))
                    .subscribe();
            return Flux.never();
        });
    }

これで3件ずつまとめてデータを扱うEchoServerになりました。
telnetで動作確認してみます。

$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
foo <== 入力
bar <== 入力
hoge ==> 出力
foo ==> 出力
bar ==> 出力
^]
telnet> quit
Connection closed.

前の例と異なり、3件単位でデータが流れていることがわかります。


Echo Serverを実装を通じてReactor Nettyを使ったTCP Serverの作り方及び、ストリームを扱うプログラミングの考え方を簡単に学びました。
Reactorを使うことでこれまで使ってこなかったストリーム脳で考えないといけない場面が増えてくるので、この記事がとっかかりになればと思います。

このエントリーをはてなブックマークに追加