📝 BLOG.IK.AM

@making's memo
(🗃 Categories 🏷 Tags)

Echo Clientを実装して学ぶReactor NettyのTCP Client

🗃 {Programming/Java/reactor/ipc/netty}

🏷 Java 🏷 Netty 🏷 Reactor 🏷 Reactor Netty

🗓 Updated at 2018-01-17T15:32:05+09:00 by Toshiaki Maki  🗓 Created at 2018-01-05T19:43:12+09:00 by Toshiaki Maki  {✒️️ Edit  ⏰ History}


⚠️ Caution: This content is a bit old. Please be careful to read.

前回、Echo Serverを実装してReactor NettyでTCP Serverを作成する方法を学びました。
今回はクライアント編です。

目次

依存ライブラリの追加

依存ライブラリはServerと同じです。
Reactor 3.1.x系のバージョン管理はreactor-bomBismuthでメンテナンスされており、これを<dependencyManagement>に設定しておけば<dependencies>内でのバージョン明示は不要です。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

Echo Serverを実装

Reactor NettyのTcpClietの使い方のテンプレートは次の通りで、TcpServerと基本的には同じです。

import org.reactivestreams.Publisher;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;

import java.util.function.BiFunction;

public class EchoClient {

    public static void main(String[] args) throws Exception {
        TcpClient client = TcpClient.create("localhost", 7777);
        client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                return null /* Responseが終了したら完了するPublisher<Void> */;
            }
        });
    }
}

ただし、Serverの場合、

NettyInbound = Request (サーバーから見てリクエスト受信) , NettyOutbound = Response (サーバーから見てレスポンス送信)

だったのが、Clientの場合は、

NettyInbound = Response (クライアントから見てレスポンス受信), NettyOutbound = Request (クライアントから見てリクエスト送信)

になります。

また、今回はEcho Clientとして、1リクエストを送ると1レスポンスが返って終了するという挙動を期待します。ですので、startAndAwaitメソッドに渡すハンドラの返り値は
Serverの時のようにFlux.never()で終了しないストリームを返すのではなく、レスポンスを受け取ったらたら完了するストリームを返すようにします。

サーバーにつなぎっぱなしのクライアントを実装したい場合は、終わらないストリームを返します。

まずは、リクエストを送信します。今回は1件だけデータを送ることを想定するのでMono<String>でデータを表現します。
データの送信にはNettyOutbound.sendNettyOutbound.sendStringメソッドが用意されています。返り値はNettyOutboundで、この型はPublisher<Void>を継承しています。

client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return null /* Responseが終了したら完了するPublisher<Void> */;
    }
});

当然、subscribeしないと実際にストリームは流れませんが、startAndAwaitに渡したハンドラの実行結果のPublisher<Void>は自動でsubscribeされます。
すなわち、次のように書くと、サーバーでデータを送信し、送信完了すると終了するクライアント処理ができます。

client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send;
    }
});

実行すると次のようなログが出力されます。

[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: [email protected](incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0xcc0c8586], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] handler is being applied: [email protected]
[DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] User Handler requesting close connection
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing context [email protected]
[DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777]

Echo Server側を見るとHello World!が出力されるでしょう。

ちなみに、startAndAwaitメソッドは名前の通りブロッキングなメソッドです。ハンドラの実行結果のPublishersubscribeして完了するまでブロックします。自分で制御したい場合は次のように書けます。

Mono<? extends NettyContext> handler = client.newHandler(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() { /* ... */ });
handler.subscribe();

サーバーからのレスポンスも取得したいので、sendが終わったらデータの受信する処理(inbond.receive())をつなげます。処理をつなげる場合はthenメソッドが使えます。

client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive().then());
    }
});

NettyOutbound.thenの引数はPublisher<Void>でないといけないので、receive()の結果に対して更にthen()を実行します。

受信したデータをasString()メソッドでFlux<String>に変換し、標準出力に表示してみます。

client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive()
                                .asString()
                                .doOnNext(s -> System.out.println("Received => " + s))
                                .then());
    }
});

実行すると次のようなログが出力されます。

[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: [email protected](incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0x54e9a483], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] handler is being applied: [email protected]
[DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
Received => Hello World!

一見うまくいっているように見えますが、実は問題があります。プログラムが終了していません。
わかりやすいようにlogメソッドを追加します。

client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive()
                                .asString()
                                .log() // added
                                .doOnNext(s -> System.out.println("Received => " + s))
                                .then());
    }
});

この状態で実行すると、次のようなログが出力されます。

[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: [email protected](incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0xd4aa0cdf], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] handler is being applied: [email protected]
[DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) onSubscribe(FluxMap.MapSubscriber)
[ INFO] (reactor-tcp-nio-4) request(unbounded)
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[ INFO] (reactor-tcp-nio-4) onNext(Hello World!)
Received => Hello World!

確かにデータは受信できているのですが、onCompleteが呼ばれていません。そのため、ハンドラーの返り値のストリームはまだ受信するデータがあるかもしれないから終わらない、という状態です。
どのタイミングで終了したいかはプロトコル次第ですが、今回は一件データが取得できればそこで終了したいです。
なので、Fluxのデータを一件だけ受け取ったら終了するMonoに変換すれば良いです。この用途にぴったりのメソッドがFlux.next()です。

asString()の後にnext()を追加します。

client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive()
                                .asString()
                                .next() // added
                                .log()
                                .doOnNext(s -> System.out.println("Received => " + s))
                                .then());
    }
});

この状態で実行すると、次のようなログが出力されます。

[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: [email protected](incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0x778cbeac], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] handler is being applied: [email protected]
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) onSubscribe(MonoNext.NextSubscriber)
[ INFO] (reactor-tcp-nio-4) request(unbounded)
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[ INFO] (reactor-tcp-nio-4) onNext(Hello World!)
Received => Hello World!
[ INFO] (reactor-tcp-nio-4) onComplete()
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] User Handler requesting close connection
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing context [email protected]
[DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777]

onCompleteが呼ばれ、無事プログラムが終了しました。

ラムダ式を使えば次のようにスッキリします。

client.startAndAwait((inbound, outbound) -> {
    Mono<String> input = Mono.just("Hello World!");
    NettyOutbound send = outbound.sendString(input);
    return send.then(inbound.receive()
                            .asString()
                            .next()
                            .log()
                            .doOnNext(s -> System.out.println("Received => " + s))
                            .then());
});

ほぼOKなのですが、TCP Clientを作ったならば、返り値もMono<String>で受けたいです。この場合は、次のようにMono.createメソッドを使用します。

Mono<String> result = Mono.create(sink ->
        client.startAndAwait((inbound, outbound) -> {
            Mono<String> input = Mono.just("Hello World!");
            NettyOutbound send = outbound.sendString(input);
            return send.then(inbound.receive()
                    .asString()
                    .next()
                    .log()
                    .doOnNext(sink::success)
                    .doOnError(sink::error)
                    .then());
        }));

まとめると、EchoClientクラスを次のように書けます。

import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;

public class EchoClient {
    private final TcpClient client;

    public EchoClient(String host, int port) {
        this.client = TcpClient.create(host, port);
    }

    public Mono<String> echo(Mono<String> input) {
        return Mono.create(sink ->
                client.startAndAwait((inbound, outbound) -> outbound
                        .sendString(input)
                        .then(inbound.receive()
                                .asString()
                                .next()
                                .doOnNext(sink::success)
                                .doOnError(sink::error)
                                .then())));
    }
}

これでNon-BlockingなEcho Clientが出来上がりました。

と、思ったけれど、よく見るとstartAndAwaitはブロッキングなメソッドなので、このままではダメですね。Mono.createの中ではstartで止めて、後始末をコールバックで書くのが良いでしょう。

import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.BlockingNettyContext;
import reactor.ipc.netty.tcp.TcpClient;

public class EchoClient {
    private final TcpClient client;

    public EchoClient(String host, int port) {
        this.client = TcpClient.create(host, port);
    }

    public Mono<String> echo(Mono<String> input) {
        return Mono.create(sink -> {
            BlockingNettyContext context = client.start((inbound, outbound) -> outbound
                    .sendString(input)
                    .then(inbound.receive()
                            .asString()
                            .next()
                            .doOnNext(sink::success)
                            .doOnError(sink::error)
                            .then()));
            sink.onDispose(() -> context.getContext().dispose());
        });
    }
}

これでEchoClientはOKでしょう。次のように利用できます。

EchoClient echoClient = new EchoClient("localhost", 7777);
Mono<String> result = echoClient.echo(Mono.just("Hello World!"));
result.log()
        .doOnNext(s -> System.out.println("Received => " + s))
        .subscribe();
// mainメソッドで実行する場合はThread.sleep(1000);などして、プログラムが先に終了しないようにブロックする必要あり。

無限ストリームに対応

では、このEchoClientで無限ストリームに対応するために、Fluxに対応するとどうなるでしょうか。

public Flux<String> echo(Flux<String> input) {
    // ....
}

次の呼び出しに対して、

Flux<String> input = Flux.interval(Duration.ofMillis(500)).map(i -> "Hi " + i);
Flux<String> result = echoClient.echo(input);
result.take(10)
        .log()
        .doOnNext(s -> System.out.println("Received => " + s))
        .subscribe();

次のように出力して欲しいです。

Received => Hi 0
Received => Hi 1
Received => Hi 2
Received => Hi 3
Received => Hi 4
Received => Hi 5
Received => Hi 6
Received => Hi 7
Received => Hi 8
Received => Hi 9

考えてみてください。