📝 BLOG.IK.AM

@making's memo
(🗃 Categories 🏷 Tags)

SpringのWebClientのtimeoutとretryについて

🗃 {Programming/Java/org/springframework/web/reactive}

🏷 Reactor 🏷 Reactor Netty 🏷 Netty 🏷 Spring 5 🏷 Spring WebFlux 🏷 Java

🗓 Updated at 2019-01-02T10:25:27Z by Toshiaki Maki  🗓 Created at 2019-01-01T18:39:16Z by Toshiaki Maki  {✒️️ Edit  ⏰ History}


Spring 5から導入されたReactive HTTPクライアントであるWebClientを使う場合のTimeoutとRetryについてのメモ。

検証時のライブラリバージョンは次の通り。

  • Spring Boot 2.1.1.RELEASE
  • Spring Framework 5.1.3.RELEASE
  • Reactor Core 3.2.3.RELEASE
  • Reactor Netty 0.8.3.RELEASE
  • Reactor Extra 3.2.0.RELEASE
  • Netty 4.1.31.Final

特にReactor Nettyのバージョンが変わるとコード例がコンパイルできない可能性があるので注意。

Connection Timeout発生の例

Connection Timeoutは発生する例を考える。次のコードで192.168.10.110は存在しないマシン/コンテナのIPアドレスである。
繋がっていたコンテナがクラッシュして消滅したケースを考えて欲しい。コンテナ時代では良くあるケースだと思う。

import reactor.core.publisher.Mono;

import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.create();
        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class);
        response.log().subscribe();

        System.in.read();
    }
}

これを実行すると次のようなログが出力される。

01:45:51.857 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
01:45:51.862 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
01:45:51.881 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [10cf09e8] HTTP GET http://192.168.10.100:8088
01:45:51.919 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
01:45:52.096 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 89636 (auto-detected)
01:45:52.121 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
01:45:52.161 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
01:45:52.163 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
01:45:52.166 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
01:45:52.222 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x70f773a3] Created new pooled channel, now 0 active connections and 1 inactive connections
01:45:52.260 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
01:45:52.260 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
01:45:52.261 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@767628ef
01:45:52.272 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x70f773a3] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.decompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
01:46:22.285 [reactor-http-nio-3] ERROR reactor.Mono.FlatMap.1 - | onError(io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088)
01:46:22.286 [reactor-http-nio-3] ERROR reactor.Mono.FlatMap.1 - 
io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)

HTTPリクエストをsubscribeしたが、connection timeoutが発生した。HTTPリクエストの後30秒でtimeout outが発生し、Reactive StreamsのonErrorメソッドが呼ばれている。発生した例外はio.netty.channel.ConnectTimeoutExceptionである。
おそらくデフォルトのconnection timeout値が30秒なのだと思われる。

timeoutオペレータを使用

30秒は長いのでtimeoutを3秒にしたい。ここでReactorのtimeout(Duration)オペレータを使用して次のように実装する。

import java.time.Duration;

import reactor.core.publisher.Mono;

import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.create();
        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .timeout(Duration.ofSeconds(3)); // <-- timeout
        response.log().subscribe();

        System.in.read();
    }
}

これを実行すると次のようなログが出力される。

01:47:54.982 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
01:47:54.985 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
01:47:55.013 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [10cf09e8] HTTP GET http://192.168.10.100:8088
01:47:55.021 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
01:47:55.082 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 90005 (auto-detected)
01:47:55.088 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
01:47:55.109 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
01:47:55.109 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
01:47:55.109 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
01:47:55.144 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xfc5a4111] Created new pooled channel, now 0 active connections and 1 inactive connections
01:47:55.174 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
01:47:55.174 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
01:47:55.176 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@11df4575
01:47:55.186 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xfc5a4111] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.decompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
01:47:57.998 [parallel-1] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [10cf09e8] Cancel signal (to close connection)
01:47:57.998 [parallel-1] ERROR reactor.Mono.Timeout.1 - onError(java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured))
01:47:57.999 [parallel-1] ERROR reactor.Mono.Timeout.1 - 
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:390)
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

subscribeの後、3秒後にTimeoutが発生し、onErrorメソッドが呼ばれている。発生した例外はjava.util.concurrent.TimeoutExceptionである。

これはこれで想定通りの挙動である。timeoutオペレータは対象がHTTPリクエストかどうかは関係なく、処理時間が指定した時間を越えるとCancelしてくれる汎用的なTimeout処理である。
そのため、connection timeoutなのかread timeoutなのかがスタックトレースから判別するのは難しい。

wiretapでHTTPのログを確認

Reactor NettyのTcpClientおよびHttpClientwiretap(true)を設定するとHTTP/TCPのログをverboseしてくれる。
WebClient内部で使われるHttpClientを設定したい場合は次のように書く。

まずはtimeoutなしの場合。

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(
                        new ReactorClientHttpConnector(HttpClient.create().wiretap(true))) // <-- wiretap
                .build();
        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class);
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

01:51:45.602 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
01:51:45.606 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
01:51:45.657 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1f010bf0] HTTP GET http://192.168.10.100:8088
01:51:45.689 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
01:51:45.890 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 90701 (auto-detected)
01:51:45.896 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
01:51:45.920 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
01:51:45.920 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
01:51:45.920 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
01:51:45.949 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf454c985] Created new pooled channel, now 0 active connections and 1 inactive connections
01:51:45.976 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
01:51:45.976 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
01:51:45.978 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6c29d2e0
01:51:45.984 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xf454c985] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
01:51:45.987 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf454c985] REGISTERED
01:51:45.988 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf454c985] CONNECT: /192.168.10.100:8088
01:52:15.991 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf454c985] CLOSE
01:52:15.991 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf454c985] UNREGISTERED
01:52:15.993 [reactor-http-nio-3] ERROR reactor.Mono.FlatMap.1 - | onError(io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088)
01:52:15.993 [reactor-http-nio-3] ERROR reactor.Mono.FlatMap.1 - 
io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)

BootstrapHandlersのDEBUGログにo.netty.handler.logging.LoggingHandlerが追加された。

HTTPのCONNECTの後、30秒後にCLOSEされていることがわかる。

次にtimeoutオペレータを設定した場合、

import java.time.Duration;

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(
                        new ReactorClientHttpConnector(HttpClient.create().wiretap(true)))
                .build();
        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .timeout(Duration.ofSeconds(3));
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

01:56:38.507 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
01:56:38.512 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
01:56:38.556 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1f010bf0] HTTP GET http://192.168.10.100:8088
01:56:38.573 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
01:56:38.712 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 91574 (auto-detected)
01:56:38.719 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
01:56:38.770 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
01:56:38.770 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
01:56:38.770 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
01:56:38.815 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xdc9b6f12] Created new pooled channel, now 0 active connections and 1 inactive connections
01:56:38.845 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
01:56:38.845 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
01:56:38.846 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@59e95cc9
01:56:38.854 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xdc9b6f12] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
01:56:38.858 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xdc9b6f12] REGISTERED
01:56:38.858 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xdc9b6f12] CONNECT: /192.168.10.100:8088
01:56:41.531 [parallel-1] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1f010bf0] Cancel signal (to close connection)
01:56:41.531 [parallel-1] ERROR reactor.Mono.Timeout.1 - onError(java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured))
01:56:41.532 [parallel-1] ERROR reactor.Mono.Timeout.1 - 
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:390)
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

CONNECTの後、CLOSEを待たずにsubscribeから3秒後に処理が中断されていることがわかる。

NettyレイヤでConnection Timeoutの設定

timeoutオペレータはハイレベルなタイムアウト処理である。次にNettyレイヤでConnection Timeoutを設定する。

ReactorNettyのTcpClientに対してNettyレイヤの設定ができる。次のコードはConnection Timeout値として500msを設定している。

import java.time.Duration;

import io.netty.channel.ChannelOption;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HttpClient.create()
                        .tcpConfiguration(tcpClient -> tcpClient
                                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)) // <-- connection timeout
                        .wiretap(true)))
                .build();
        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .timeout(Duration.ofSeconds(3));
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

02:20:31.417 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
02:20:31.419 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
02:20:31.439 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [10cf09e8] HTTP GET http://192.168.10.100:8088
02:20:31.447 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
02:20:31.782 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 95916 (auto-detected)
02:20:31.789 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
02:20:31.842 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
02:20:31.842 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
02:20:31.842 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
02:20:31.887 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd0d6eda8] Created new pooled channel, now 0 active connections and 1 inactive connections
02:20:31.916 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
02:20:31.916 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
02:20:31.917 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@59e95cc9
02:20:31.926 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xd0d6eda8] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:20:31.930 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd0d6eda8] REGISTERED
02:20:31.931 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd0d6eda8] CONNECT: /192.168.10.100:8088
02:20:32.437 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd0d6eda8] CLOSE
02:20:32.438 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd0d6eda8] UNREGISTERED
02:20:32.439 [reactor-http-nio-3] ERROR reactor.Mono.Timeout.1 - onError(io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088)
02:20:32.439 [reactor-http-nio-3] ERROR reactor.Mono.Timeout.1 - 
io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)

HTTPのCONNECTの後、500ms後にCLOSEされ、io.netty.channel.ConnectTimeoutExceptionが発生していることがわかる。
3秒経つ前にエラーで終了したためtimeoutオペレータは呼ばれていない。

Connection Refused発生の例

ちょっと脱線して、Connection Timeoutではなく、Connection Refusedの場合はどうなるのかを見てみる。
Connection Timeoutは要求に対して応答が帰ってこないが、Connection Refusedは要求に対して拒否応答を返す。
例えばポート番号が誤っているとかファイアウォールで拒否されているケースが考えられる。

コードの接続先をlocalhost:8088に変える。ここではローカルマシンは8088番ポートをリッスンしていない前提である。

import java.time.Duration;

import io.netty.channel.ChannelOption;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HttpClient.create()
                        .tcpConfiguration(tcpClient -> tcpClient
                                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500))
                        .wiretap(true)))
                .build();
        Mono<String> response = webClient.get() //
                .uri("http://localhost:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .timeout(Duration.ofSeconds(3));
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

02:02:06.047 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
02:02:06.048 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
02:02:06.068 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [10cf09e8] HTTP GET http://localhost:8088
02:02:06.077 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for localhost:8088
02:02:06.149 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 92560 (auto-detected)
02:02:06.159 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
02:02:06.195 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
02:02:06.195 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
02:02:06.195 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
02:02:06.248 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x345a4a73] Created new pooled channel, now 0 active connections and 1 inactive connections
02:02:06.319 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
02:02:06.320 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
02:02:06.325 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@1f38b6b1
02:02:06.344 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x345a4a73] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:02:06.362 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x345a4a73] REGISTERED
02:02:06.363 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x345a4a73] CONNECT: localhost/127.0.0.1:8088
02:02:06.379 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x345a4a73] CLOSE
02:02:06.380 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x345a4a73] UNREGISTERED
02:02:06.381 [reactor-http-nio-3] ERROR reactor.Mono.Timeout.1 - onError(io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088)
02:02:06.381 [reactor-http-nio-3] ERROR reactor.Mono.Timeout.1 - 
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:636)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.ConnectException: Connection refused
    ... 10 common frames omitted

HTTPのCONNECTの後、数ms後にCLOSEされ、java.net.ConnectExceptionが発生していることがわかる。
Connection Timeoutは発生していない。そもそも別の問題である。

リトライ処理

Connection TimeoutやConnection Refusedは時間が経てば改善される可能性がある。
例えばコンテナが復旧したり、iptableの設定が反映されたりする場合。
こういうケースに自律的に復旧できるようにリトライを試みるようにしたい。

Reactorではretry(long)retry(Predicate)オペレータが用意されているが、
Retryを細かく制御したい場合はReactor Extrareactor.retry.Retry
Retryの仕様を定義し、retryWhenオペレータを使うと良い。

まずはConnection Refusedのケースから試す。
次のコードはリトライを最大2回まで実行し、間隔を500ms空けるように設定している。

import java.time.Duration;

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Backoff;
import reactor.retry.Retry;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(
                        new ReactorClientHttpConnector(HttpClient.create().wiretap(true)))
                .build();

        Retry<?> retry = Retry.any() //
                .retryMax(2) //
                .backoff(Backoff.fixed(Duration.ofMillis(500)));

        Mono<String> response = webClient.get() //
                .uri("http://localhost:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .retryWhen(retry) // <-- retry
                .timeout(Duration.ofSeconds(3));

        System.in.read();
    }
}

retryWhenの外にtimeoutオペレータがあるため、どんなにリトライしても3秒で中断される。

実行すると次のようなログが出力される。

02:11:02.622 [main] INFO reactor.Mono.RetryWhen.1 - onSubscribe(SerializedSubscriber)
02:11:02.623 [main] INFO reactor.Mono.RetryWhen.1 - request(unbounded)
02:11:02.655 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://localhost:8088
02:11:02.665 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for localhost:8088
02:11:02.779 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 94192 (auto-detected)
02:11:02.786 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
02:11:02.814 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
02:11:02.814 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
02:11:02.814 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
02:11:02.852 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x9b8d0dc4] Created new pooled channel, now 0 active connections and 1 inactive connections
02:11:02.945 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
02:11:02.945 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
02:11:02.947 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@4cf2f770
02:11:02.957 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x9b8d0dc4] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:11:02.964 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x9b8d0dc4] REGISTERED
02:11:02.965 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x9b8d0dc4] CONNECT: localhost/127.0.0.1:8088
02:11:02.972 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x9b8d0dc4] CLOSE
02:11:02.973 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x9b8d0dc4] UNREGISTERED
02:11:02.973 [reactor-http-nio-3] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=1 exception=io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088 backoff={500ms}
02:11:03.481 [parallel-1] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://localhost:8088
02:11:03.483 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf20df237] Created new pooled channel, now 0 active connections and 1 inactive connections
02:11:03.483 [reactor-http-nio-6] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xf20df237] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:11:03.483 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf20df237] REGISTERED
02:11:03.484 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf20df237] CONNECT: localhost/127.0.0.1:8088
02:11:03.484 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf20df237] CLOSE
02:11:03.485 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf20df237] UNREGISTERED
02:11:03.484 [reactor-http-nio-5] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=2 exception=io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088 backoff={500ms}
02:11:03.988 [parallel-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://localhost:8088
02:11:03.989 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x4e2e8307] Created new pooled channel, now 0 active connections and 1 inactive connections
02:11:03.989 [reactor-http-nio-8] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x4e2e8307] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:11:03.989 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x4e2e8307] REGISTERED
02:11:03.989 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x4e2e8307] CONNECT: localhost/127.0.0.1:8088
02:11:03.990 [reactor-http-nio-7] DEBUG reactor.retry.DefaultRetry - Retries exhausted, retry context: iteration=3 exception=io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088 backoff={EXHAUSTED}
02:11:03.991 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x4e2e8307] CLOSE
02:11:03.992 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x4e2e8307] UNREGISTERED
02:11:03.994 [reactor-http-nio-7] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] Cancel signal (to close connection)
02:11:03.994 [reactor-http-nio-7] ERROR reactor.Mono.RetryWhen.1 - onError(reactor.retry.RetryExhaustedException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088)
02:11:03.994 [reactor-http-nio-7] ERROR reactor.Mono.RetryWhen.1 - 
reactor.retry.RetryExhaustedException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088
    at reactor.retry.DefaultRetry.retry(DefaultRetry.java:130)
    at reactor.retry.DefaultRetry.lambda$apply$1(DefaultRetry.java:115)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
    at reactor.core.publisher.FluxIndex$IndexSubscriber.onNext(FluxIndex.java:96)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142)
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:160)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1718)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:193)
    at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:100)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:167)
    at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:344)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:167)
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.operationComplete(PooledConnectionProvider.java:598)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
    at io.netty.util.concurrent.DefaultPromise.access$000(DefaultPromise.java:33)
    at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:435)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8088
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:636)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
    ... 2 common frames omitted
Caused by: java.net.ConnectException: Connection refused
    ... 10 common frames omitted

次のような処理になっていることがわかる。

CONNECT -> [2-10 ms] -> CLOSE -> [500 ms] -> CONNECT -> [2-10 ms] -> CLOSE -> [500 ms] -> CONNECT -> [2-10 ms] -> CLOSE -> onError(RetryExhaustedException)

2回リトライを繰り返した後、reactor.retry.RetryExhaustedExceptionが発生している。経過時間は1.2sくらいでタイムアウト時間未満である。

次にConnection Timeoutのケースを試す。ここでは意図的にNettyのConnection Timeoutの設定は行なっていない。

import java.time.Duration;

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Backoff;
import reactor.retry.Retry;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(
                        new ReactorClientHttpConnector(HttpClient.create().wiretap(true)))
                .build();

        Retry<?> retry = Retry.any() //
                .retryMax(2) //
                .backoff(Backoff.fixed(Duration.ofMillis(500)));

        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .retryWhen(retry) //
                .timeout(Duration.ofSeconds(3));
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

02:15:44.339 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
02:15:44.341 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
02:15:44.390 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://192.168.10.100:8088
02:15:44.398 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
02:15:44.469 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 95045 (auto-detected)
02:15:44.481 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
02:15:44.509 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
02:15:44.509 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
02:15:44.509 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
02:15:44.537 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x8284305f] Created new pooled channel, now 0 active connections and 1 inactive connections
02:15:44.585 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
02:15:44.585 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
02:15:44.589 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@36c3f70c
02:15:44.605 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x8284305f] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:15:44.614 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x8284305f] REGISTERED
02:15:44.614 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x8284305f] CONNECT: /192.168.10.100:8088
02:15:47.360 [parallel-1] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] Cancel signal (to close connection)
02:15:47.362 [parallel-1] ERROR reactor.Mono.Timeout.1 - onError(java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'retryWhen' (and no fallback has been configured))
02:15:47.363 [parallel-1] ERROR reactor.Mono.Timeout.1 - 
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'retryWhen' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:390)
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

次のような処理になっていることがわかる。

CONNECT -> [3 ms] -> onError(TimeoutException)

これは初めてtimeoutオペレータを導入した際と同じ結果である。リトライ制御を入れても同じ結果になることに注目して欲しい。

リトライはConnectが失敗した後に行う想定であるが、1回目のConnectでハイレベルなtimeoutオペレータの時間をオーバーしたため、リトライすることなく処理が中断された。

今度はretryWhentimeoutの順番を入れ替えてみる。

import java.time.Duration;

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Backoff;
import reactor.retry.Retry;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
            .clientConnector(
                new ReactorClientHttpConnector(HttpClient.create().wiretap(true)))
            .build();

        Retry<?> retry = Retry.any() //
            .retryMax(2) //
            .backoff(Backoff.fixed(Duration.ofMillis(500)));

        Mono<String> response = webClient.get() //
            .uri("http://192.168.10.100:8088") //
            .retrieve() //
            .bodyToMono(String.class) //
            .timeout(Duration.ofSeconds(3)) // <--- !!!
            .retryWhen(retry);
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

03:29:26.174 [main] INFO reactor.Mono.RetryWhen.1 - onSubscribe(SerializedSubscriber)
03:29:26.175 [main] INFO reactor.Mono.RetryWhen.1 - request(unbounded)
03:29:26.231 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://192.168.10.100:8088
03:29:26.238 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
03:29:26.308 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 8541 (auto-detected)
03:29:26.314 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
03:29:26.354 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
03:29:26.354 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
03:29:26.354 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
03:29:26.435 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x7c5211f4] Created new pooled channel, now 0 active connections and 1 inactive connections
03:29:26.474 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
03:29:26.475 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
03:29:26.476 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@dd79880
03:29:26.484 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x7c5211f4] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
03:29:26.489 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x7c5211f4] REGISTERED
03:29:26.489 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x7c5211f4] CONNECT: /192.168.10.100:8088
03:29:29.217 [parallel-1] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] Cancel signal (to close connection)
03:29:29.218 [parallel-1] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=1 exception=java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured) backoff={500ms}
03:29:29.722 [parallel-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://192.168.10.100:8088
03:29:29.723 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x34119841] Created new pooled channel, now 0 active connections and 2 inactive connections
03:29:29.723 [reactor-http-nio-6] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x34119841] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
03:29:29.724 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0x34119841] REGISTERED
03:29:29.724 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0x34119841] CONNECT: /192.168.10.100:8088
03:29:32.725 [parallel-3] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] Cancel signal (to close connection)
03:29:32.725 [parallel-3] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=2 exception=java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured) backoff={500ms}
03:29:33.228 [parallel-4] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] HTTP GET http://192.168.10.100:8088
03:29:33.229 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x88d2cad9] Created new pooled channel, now 0 active connections and 3 inactive connections
03:29:33.230 [reactor-http-nio-8] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x88d2cad9] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
03:29:33.230 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x88d2cad9] REGISTERED
03:29:33.230 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x88d2cad9] CONNECT: /192.168.10.100:8088
03:29:36.233 [parallel-5] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1a6c1270] Cancel signal (to close connection)
03:29:36.233 [parallel-5] DEBUG reactor.retry.DefaultRetry - Retries exhausted, retry context: iteration=3 exception=java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured) backoff={EXHAUSTED}
03:29:36.235 [parallel-5] ERROR reactor.Mono.RetryWhen.1 - onError(reactor.retry.RetryExhaustedException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured))
03:29:36.236 [parallel-5] ERROR reactor.Mono.RetryWhen.1 - 
reactor.retry.RetryExhaustedException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured)
    at reactor.retry.DefaultRetry.retry(DefaultRetry.java:130)
    at reactor.retry.DefaultRetry.lambda$apply$1(DefaultRetry.java:115)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
    at reactor.core.publisher.FluxIndex$IndexSubscriber.onNext(FluxIndex.java:96)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142)
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:160)
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:114)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:390)
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'flatMap' (and no fallback has been configured)
    ... 13 common frames omitted

次のような処理になっていることがわかる。

CONNECT -> [3 s] -> cancel -> [500 ms] -> CONNECT -> [3 s] -> cancel -> [500 ms] -> CONNECT -> [3 s] -> cancel -> onError(RetryExhaustedException)

timeoutの外にretryWhenが存在するため3秒のtimeoutが合計3回行われており、処理の終了までに約10秒かかっていることがわかる。

最後にいよいよretryWhentimeoutの順番を元に戻して、NettyのConnection Timeout設定を再度設定する。

import java.time.Duration;

import io.netty.channel.ChannelOption;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Backoff;
import reactor.retry.Retry;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HttpClient.create()
                        .tcpConfiguration(tcpClient -> tcpClient
                                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500))
                        .wiretap(true)))
                .build();

        Retry<?> retry = Retry.any() //
                .retryMax(2) //
                .backoff(Backoff.fixed(Duration.ofMillis(500)));

        Mono<String> response = webClient.get() //
                .uri("http://192.168.10.100:8088") //
                .retrieve() //
                .bodyToMono(String.class) //
                .retryWhen(retry) //
                .timeout(Duration.ofSeconds(3));
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

02:18:30.424 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
02:18:30.427 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
02:18:30.480 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [18a136ac] HTTP GET http://192.168.10.100:8088
02:18:30.488 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for /192.168.10.100:8088
02:18:30.602 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 95546 (auto-detected)
02:18:30.609 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
02:18:30.638 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
02:18:30.639 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
02:18:30.639 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
02:18:30.697 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x62943ed1] Created new pooled channel, now 0 active connections and 1 inactive connections
02:18:30.732 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
02:18:30.732 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
02:18:30.733 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6d6af5d2
02:18:30.741 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x62943ed1] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:18:30.745 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x62943ed1] REGISTERED
02:18:30.745 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x62943ed1] CONNECT: /192.168.10.100:8088
02:18:31.252 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x62943ed1] CLOSE
02:18:31.253 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0x62943ed1] UNREGISTERED
02:18:31.254 [reactor-http-nio-3] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=1 exception=io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088 backoff={500ms}
02:18:31.759 [parallel-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [18a136ac] HTTP GET http://192.168.10.100:8088
02:18:31.760 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xc1752cf7] Created new pooled channel, now 0 active connections and 1 inactive connections
02:18:31.760 [reactor-http-nio-6] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xc1752cf7] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:18:31.761 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xc1752cf7] REGISTERED
02:18:31.761 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xc1752cf7] CONNECT: /192.168.10.100:8088
02:18:32.265 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xc1752cf7] CLOSE
02:18:32.266 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xc1752cf7] UNREGISTERED
02:18:32.266 [reactor-http-nio-5] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=2 exception=io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088 backoff={500ms}
02:18:32.770 [parallel-3] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [18a136ac] HTTP GET http://192.168.10.100:8088
02:18:32.771 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x1962ceda] Created new pooled channel, now 0 active connections and 1 inactive connections
02:18:32.772 [reactor-http-nio-8] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x1962ceda] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
02:18:32.772 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x1962ceda] REGISTERED
02:18:32.772 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x1962ceda] CONNECT: /192.168.10.100:8088
02:18:33.278 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x1962ceda] CLOSE
02:18:33.278 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0x1962ceda] UNREGISTERED
02:18:33.279 [reactor-http-nio-7] DEBUG reactor.retry.DefaultRetry - Retries exhausted, retry context: iteration=3 exception=io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088 backoff={EXHAUSTED}
02:18:33.282 [reactor-http-nio-7] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [18a136ac] Cancel signal (to close connection)
02:18:33.282 [reactor-http-nio-7] ERROR reactor.Mono.Timeout.1 - onError(reactor.retry.RetryExhaustedException: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088)
02:18:33.283 [reactor-http-nio-7] ERROR reactor.Mono.Timeout.1 - 
reactor.retry.RetryExhaustedException: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088
    at reactor.retry.DefaultRetry.retry(DefaultRetry.java:130)
    at reactor.retry.DefaultRetry.lambda$apply$1(DefaultRetry.java:115)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
    at reactor.core.publisher.FluxIndex$IndexSubscriber.onNext(FluxIndex.java:96)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142)
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:160)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1718)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:193)
    at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:100)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:167)
    at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:344)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:167)
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.operationComplete(PooledConnectionProvider.java:598)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
    at io.netty.util.concurrent.DefaultPromise.access$000(DefaultPromise.java:33)
    at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:435)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.10.100:8088
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    ... 5 common frames omitted

次のような処理になっていることがわかる。

CONNECT -> [500 ms] -> CLOSE -> [500 ms] -> CONNECT -> [500 ms] -> CLOSE -> [500 ms] -> CONNECT -> [500 ms] -> CLOSE -> onError(RetryExhaustedException)

Connection Refusedのケースと同じく、2回リトライを繰り返した後、reactor.retry.RetryExhaustedExceptionが発生している。
ただし、各処理でConnection Timeoutの500msだけ待つため、経過時間は2.8sくらいとなっている。timeoutオペレータのタイムアウト時間未満であるため、TimeoutExceptionは発生していない。

おそらく、この例が普通に想定されるタイムアウト設定に近いのではないかと思う。

NettyのConnection Timeoutを設定の代わりに、timeoutオペレータをretryWhenの前後に設定し、

            .timeout(Duration.ofMillis(500))
            .retryWhen(retry) //
            .timeout(Duration.ofSeconds(3));

とすることで同じようなタイムアウトは実現できるが、スタックトレースのわかりやすさの観点で前コードの方がわかりやすい。

リトライが想定通りに行われるようにするために、中のtimeoutと外のtimeoutの違いを理解する必要がある。

あとはこれの外にサーキットブレーカーを入れると良いのだが、その話は機会があれば。

(おまけ) NettyレイヤでRead Timeout / Write Timeoutの設定

ついでにConnectionは作成できた後の、レスポンスを受信するタイムアウトとリクエストボディを送信するタイムアウトの設定方法も記載しておく。
こちらのNettyレイヤでの設定である。

Conection確立後のNettyのHandler設定はTcpClientdoOnConnectedメソッドに追加できる。

次のコードは意図的にRead Timeoutを小さく設定し、レスポンスを返すのに1秒かかるサービスにアクセスしている。

package am.ik.blog;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Backoff;
import reactor.retry.Retry;

import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

public class Demoo {

    public static void main(String[] args) throws Exception {
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HttpClient.create()
                        .tcpConfiguration(tcpClient -> tcpClient
                                .bootstrap(bootstrap -> bootstrap.option(
                                        ChannelOption.CONNECT_TIMEOUT_MILLIS, 500))
                                .doOnConnected(connection -> connection
                                        .addHandler(new ReadTimeoutHandler(100,
                                                TimeUnit.MILLISECONDS))
                                        .addHandler(new WriteTimeoutHandler(100,
                                                TimeUnit.MILLISECONDS))))
                        .wiretap(true)))
                .build();

        Retry<?> retry = Retry.any() //
                .retryMax(2) //
                .backoff(Backoff.fixed(Duration.ofMillis(500)));

        Mono<String> response = webClient.get() //
                .uri("http://httpbin.org/delay/1") //
                .retrieve() //
                .bodyToMono(String.class) //
                .retryWhen(retry) //
                .timeout(Duration.ofSeconds(3));
        response.log().subscribe();

        System.in.read();
    }
}

実行すると次のようなログが出力される。

19:05:33.719 [main] INFO reactor.Mono.Timeout.1 - onSubscribe(SerializedSubscriber)
19:05:33.721 [main] INFO reactor.Mono.Timeout.1 - request(unbounded)
19:05:33.798 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [66ac5762] HTTP GET http://httpbin.org/delay/1
19:05:33.802 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [http] for httpbin.org:80
19:05:33.892 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 39209 (auto-detected)
19:05:33.897 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: ac:de:48:ff:fe:00:11:22 (auto-detected)
19:05:33.915 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
19:05:33.915 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
19:05:33.915 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
19:05:33.950 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc] Created new pooled channel, now 0 active connections and 1 inactive connections
19:05:34.007 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
19:05:34.007 [reactor-http-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
19:05:34.010 [reactor-http-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@444ca454
19:05:34.035 [reactor-http-nio-4] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xf59eb6bc] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:34.055 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc] REGISTERED
19:05:34.057 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc] CONNECT: httpbin.org/54.165.51.142:80
19:05:34.258 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] ACTIVE
19:05:34.258 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] onStateChange(PooledConnection{channel=[id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80]}, [connected])
19:05:34.271 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80]}}, [configured])
19:05:34.273 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Registering pool release on close event for channel
19:05:34.274 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClientConnect - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Handler is being applied: {uri=http://httpbin.org/delay/1, method=GET}
19:05:34.284 [reactor-http-nio-4] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET /delay/1 HTTP/1.1
user-agent: ReactorNetty/0.8.3.RELEASE
host: httpbin.org
accept: */*
19:05:34.291 [reactor-http-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
19:05:34.291 [reactor-http-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
19:05:34.291 [reactor-http-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
19:05:34.291 [reactor-http-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
19:05:34.306 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] WRITE: 97B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 64 65 6c 61 79 2f 31 20 48 54 54 |GET /delay/1 HTT|
|00000010| 50 2f 31 2e 31 0d 0a 75 73 65 72 2d 61 67 65 6e |P/1.1..user-agen|
|00000020| 74 3a 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f |t: ReactorNetty/|
|00000030| 30 2e 38 2e 33 2e 52 45 4c 45 41 53 45 0d 0a 68 |0.8.3.RELEASE..h|
|00000040| 6f 73 74 3a 20 68 74 74 70 62 69 6e 2e 6f 72 67 |ost: httpbin.org|
|00000050| 0d 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d |..accept: */*...|
|00000060| 0a                                              |.               |
+--------+-------------------------------------------------+----------------+
19:05:34.307 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] FLUSH
19:05:34.313 [reactor-http-nio-4] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Writing object EmptyLastHttpContent
19:05:34.313 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] WRITE: 0B
19:05:34.313 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] FLUSH
19:05:34.313 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] onStateChange(GET{uri=/delay/1, connection=PooledConnection{channel=[id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80]}}, [request_sent])
19:05:34.319 [reactor-http-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Added encoder [ReadTimeoutHandler] at the beginning of the user pipeline, full pipeline: [reactor.left.loggingHandler, reactor.left.httpCodec, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
19:05:34.319 [reactor-http-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Added encoder [WriteTimeoutHandler] at the beginning of the user pipeline, full pipeline: [reactor.left.loggingHandler, reactor.left.httpCodec, WriteTimeoutHandler, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
19:05:34.319 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Channel connected, now 1 active connections and 0 inactive connections
19:05:34.420 [reactor-http-nio-4] ERROR reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] Pooled connection observed an error
io.netty.handler.timeout.ReadTimeoutException: null
19:05:34.422 [reactor-http-nio-4] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=1 exception=io.netty.handler.timeout.ReadTimeoutException backoff={500ms}
19:05:34.423 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 - R:httpbin.org/54.165.51.142:80] CLOSE
19:05:34.424 [reactor-http-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80] Channel cleaned, now 0 active connections and 1 inactive connections
19:05:34.424 [reactor-http-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80] Non Removed handler: ReadTimeoutHandler, context: ChannelHandlerContext(ReadTimeoutHandler, [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80]), pipeline: DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (WriteTimeoutHandler = io.netty.handler.timeout.WriteTimeoutHandler), (ReadTimeoutHandler = io.netty.handler.timeout.ReadTimeoutHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:34.424 [reactor-http-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80] Non Removed handler: WriteTimeoutHandler, context: ChannelHandlerContext(WriteTimeoutHandler, [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80]), pipeline: DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (WriteTimeoutHandler = io.netty.handler.timeout.WriteTimeoutHandler), (ReadTimeoutHandler = io.netty.handler.timeout.ReadTimeoutHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:34.425 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80] INACTIVE
19:05:34.425 [reactor-http-nio-4] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf59eb6bc, L:/192.168.11.38:61666 ! R:httpbin.org/54.165.51.142:80] UNREGISTERED
19:05:34.927 [parallel-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [66ac5762] HTTP GET http://httpbin.org/delay/1
19:05:34.928 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9] Created new pooled channel, now 0 active connections and 2 inactive connections
19:05:34.928 [reactor-http-nio-6] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xf2bc38c9] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:34.929 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9] REGISTERED
19:05:34.929 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9] CONNECT: httpbin.org/54.165.51.142:80
19:05:35.087 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] ACTIVE
19:05:35.087 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] onStateChange(PooledConnection{channel=[id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80]}, [connected])
19:05:35.087 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80]}}, [configured])
19:05:35.087 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Registering pool release on close event for channel
19:05:35.087 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClientConnect - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Handler is being applied: {uri=http://httpbin.org/delay/1, method=GET}
19:05:35.087 [reactor-http-nio-6] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET /delay/1 HTTP/1.1
user-agent: ReactorNetty/0.8.3.RELEASE
host: httpbin.org
accept: */*
19:05:35.099 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] WRITE: 97B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 64 65 6c 61 79 2f 31 20 48 54 54 |GET /delay/1 HTT|
|00000010| 50 2f 31 2e 31 0d 0a 75 73 65 72 2d 61 67 65 6e |P/1.1..user-agen|
|00000020| 74 3a 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f |t: ReactorNetty/|
|00000030| 30 2e 38 2e 33 2e 52 45 4c 45 41 53 45 0d 0a 68 |0.8.3.RELEASE..h|
|00000040| 6f 73 74 3a 20 68 74 74 70 62 69 6e 2e 6f 72 67 |ost: httpbin.org|
|00000050| 0d 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d |..accept: */*...|
|00000060| 0a                                              |.               |
+--------+-------------------------------------------------+----------------+
19:05:35.099 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] FLUSH
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Writing object EmptyLastHttpContent
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] WRITE: 0B
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] FLUSH
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] onStateChange(GET{uri=/delay/1, connection=PooledConnection{channel=[id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80]}}, [request_sent])
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.ReactorNetty - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Added encoder [ReadTimeoutHandler] at the beginning of the user pipeline, full pipeline: [reactor.left.loggingHandler, reactor.left.httpCodec, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.ReactorNetty - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Added encoder [WriteTimeoutHandler] at the beginning of the user pipeline, full pipeline: [reactor.left.loggingHandler, reactor.left.httpCodec, WriteTimeoutHandler, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
19:05:35.100 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Channel connected, now 1 active connections and 1 inactive connections
19:05:35.203 [reactor-http-nio-6] ERROR reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] Pooled connection observed an error
io.netty.handler.timeout.ReadTimeoutException: null
19:05:35.205 [reactor-http-nio-6] DEBUG reactor.retry.DefaultRetry - Scheduling retry attempt, retry context: iteration=2 exception=io.netty.handler.timeout.ReadTimeoutException backoff={500ms}
19:05:35.206 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 - R:httpbin.org/54.165.51.142:80] CLOSE
19:05:35.207 [reactor-http-nio-6] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80] Channel cleaned, now 0 active connections and 2 inactive connections
19:05:35.208 [reactor-http-nio-6] DEBUG reactor.netty.ReactorNetty - [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80] Non Removed handler: ReadTimeoutHandler, context: ChannelHandlerContext(ReadTimeoutHandler, [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80]), pipeline: DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (WriteTimeoutHandler = io.netty.handler.timeout.WriteTimeoutHandler), (ReadTimeoutHandler = io.netty.handler.timeout.ReadTimeoutHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:35.210 [reactor-http-nio-6] DEBUG reactor.netty.ReactorNetty - [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80] Non Removed handler: WriteTimeoutHandler, context: ChannelHandlerContext(WriteTimeoutHandler, [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80]), pipeline: DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (WriteTimeoutHandler = io.netty.handler.timeout.WriteTimeoutHandler), (ReadTimeoutHandler = io.netty.handler.timeout.ReadTimeoutHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:35.210 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80] INACTIVE
19:05:35.210 [reactor-http-nio-6] DEBUG reactor.netty.http.client.HttpClient - [id: 0xf2bc38c9, L:/192.168.11.38:61667 ! R:httpbin.org/54.165.51.142:80] UNREGISTERED
19:05:35.706 [parallel-3] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [66ac5762] HTTP GET http://httpbin.org/delay/1
19:05:35.708 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5] Created new pooled channel, now 0 active connections and 3 inactive connections
19:05:35.708 [reactor-http-nio-8] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xd44acee5] Initialized pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:35.708 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5] REGISTERED
19:05:35.708 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5] CONNECT: httpbin.org/54.165.51.142:80
19:05:35.865 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] ACTIVE
19:05:35.866 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] onStateChange(PooledConnection{channel=[id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80]}, [connected])
19:05:35.866 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80]}}, [configured])
19:05:35.866 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Registering pool release on close event for channel
19:05:35.866 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClientConnect - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Handler is being applied: {uri=http://httpbin.org/delay/1, method=GET}
19:05:35.866 [reactor-http-nio-8] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET /delay/1 HTTP/1.1
user-agent: ReactorNetty/0.8.3.RELEASE
host: httpbin.org
accept: */*
19:05:35.878 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] WRITE: 97B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 64 65 6c 61 79 2f 31 20 48 54 54 |GET /delay/1 HTT|
|00000010| 50 2f 31 2e 31 0d 0a 75 73 65 72 2d 61 67 65 6e |P/1.1..user-agen|
|00000020| 74 3a 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f |t: ReactorNetty/|
|00000030| 30 2e 38 2e 33 2e 52 45 4c 45 41 53 45 0d 0a 68 |0.8.3.RELEASE..h|
|00000040| 6f 73 74 3a 20 68 74 74 70 62 69 6e 2e 6f 72 67 |ost: httpbin.org|
|00000050| 0d 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d |..accept: */*...|
|00000060| 0a                                              |.               |
+--------+-------------------------------------------------+----------------+
19:05:35.879 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] FLUSH
19:05:35.879 [reactor-http-nio-8] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Writing object EmptyLastHttpContent
19:05:35.879 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] WRITE: 0B
19:05:35.879 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] FLUSH
19:05:35.879 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] onStateChange(GET{uri=/delay/1, connection=PooledConnection{channel=[id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80]}}, [request_sent])
19:05:35.879 [reactor-http-nio-8] DEBUG reactor.netty.ReactorNetty - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Added encoder [ReadTimeoutHandler] at the beginning of the user pipeline, full pipeline: [reactor.left.loggingHandler, reactor.left.httpCodec, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
19:05:35.880 [reactor-http-nio-8] DEBUG reactor.netty.ReactorNetty - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Added encoder [WriteTimeoutHandler] at the beginning of the user pipeline, full pipeline: [reactor.left.loggingHandler, reactor.left.httpCodec, WriteTimeoutHandler, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
19:05:35.880 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Channel connected, now 1 active connections and 2 inactive connections
19:05:35.984 [reactor-http-nio-8] ERROR reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] Pooled connection observed an error
io.netty.handler.timeout.ReadTimeoutException: null
19:05:35.985 [reactor-http-nio-8] DEBUG reactor.retry.DefaultRetry - Retries exhausted, retry context: iteration=3 exception=io.netty.handler.timeout.ReadTimeoutException backoff={EXHAUSTED}
19:05:35.988 [reactor-http-nio-8] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [66ac5762] Cancel signal (to close connection)
19:05:35.989 [reactor-http-nio-8] ERROR reactor.Mono.Timeout.1 - onError(reactor.retry.RetryExhaustedException: io.netty.handler.timeout.ReadTimeoutException)
19:05:35.989 [reactor-http-nio-8] ERROR reactor.Mono.Timeout.1 - 
reactor.retry.RetryExhaustedException: io.netty.handler.timeout.ReadTimeoutException
    at reactor.retry.DefaultRetry.retry(DefaultRetry.java:130)
    at reactor.retry.DefaultRetry.lambda$apply$1(DefaultRetry.java:115)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
    at reactor.core.publisher.FluxIndex$IndexSubscriber.onNext(FluxIndex.java:96)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142)
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:160)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1718)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:193)
    at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:100)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:167)
    at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:376)
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:350)
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onUncaughtException(PooledConnectionProvider.java:493)
    at reactor.netty.resources.PooledConnectionProvider$PooledConnection.onUncaughtException(PooledConnectionProvider.java:402)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:176)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:364)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:396)
    at reactor.netty.channel.ChannelOperationsHandler.exceptionCaught(ChannelOperationsHandler.java:185)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
    at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98)
    at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90)
    at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:494)
    at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:466)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.netty.handler.timeout.ReadTimeoutException: null
19:05:35.989 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 - R:httpbin.org/54.165.51.142:80] CLOSE
19:05:35.989 [reactor-http-nio-8] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80] Channel cleaned, now 0 active connections and 3 inactive connections
19:05:35.989 [reactor-http-nio-8] DEBUG reactor.netty.ReactorNetty - [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80] Non Removed handler: ReadTimeoutHandler, context: ChannelHandlerContext(ReadTimeoutHandler, [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80]), pipeline: DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (WriteTimeoutHandler = io.netty.handler.timeout.WriteTimeoutHandler), (ReadTimeoutHandler = io.netty.handler.timeout.ReadTimeoutHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:35.990 [reactor-http-nio-8] DEBUG reactor.netty.ReactorNetty - [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80] Non Removed handler: WriteTimeoutHandler, context: ChannelHandlerContext(WriteTimeoutHandler, [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80]), pipeline: DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (WriteTimeoutHandler = io.netty.handler.timeout.WriteTimeoutHandler), (ReadTimeoutHandler = io.netty.handler.timeout.ReadTimeoutHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:05:35.990 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80] INACTIVE
19:05:35.990 [reactor-http-nio-8] DEBUG reactor.netty.http.client.HttpClient - [id: 0xd44acee5, L:/192.168.11.38:61668 ! R:httpbin.org/54.165.51.142:80] UNREGISTERED

io.netty.handler.timeout.ReadTimeoutExceptionが発生して2回リトライした後RetryExhaustedExceptionが発生している。

(おまけ)Spring BootのAuto Configuration

Spring Bootの場合はClientHttpConnectorをBean定義しておけばWebClient.Builerに自動で設定される。

https://github.com/spring-projects/spring-boot/blob/v2.1.1.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/web/reactive/function/client/ClientHttpConnectorAutoConfiguration.java

こんな感じのReactorClientHttpConnectorを定義して、

@Bean
public ReactorClientHttpConnector reactorClientHttpConnector() {
    return new ReactorClientHttpConnector(HttpClient.create()
            .tcpConfiguration(tcpClient -> tcpClient
                    .bootstrap(bootstrap -> bootstrap
                            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000))
                    .doOnConnected(connection -> connection
                            .addHandler(new ReadTimeoutHandler(3, TimeUnit.SECONDS))
                            .addHandler(
                                    new WriteTimeoutHandler(3, TimeUnit.SECONDS)))));
}

WebClientを使う側はWebClient.Builderをインジェクションすれば良い。

private final WebClient webClient;

public FooService(WebClient.Builder webClientBuilder) {
    this.webClient = webClientBuilder.build();
}

オフィシャルドキュメントはこちら