IK.AM

@making's tech note


How to use the TCP client provided by Spring Framework (Part 1/2)

🗃 {Programming/Java/org/springframework/messaging/tcp}
🏷 Java 🏷 Spring 🏷 TCP 
🗓 Updated at 2015-10-04T02:01:18Z  🗓 Created at 2015-10-04T02:01:18Z   🌎 English Page

When you want to use not HTTP client but TCP client, org.springframework.messaging.tcp.TcpOperations interface is available. org.springframework.messaging.tcp.reactor.Reactor2TcpClient is provided as a implementation of TcpOperations which is using Reactor. Reactor2TcpClient is released at Spring 4.2. (Reactor11TcpClient had been provided since Spring 4.0. It disappeared at 4.2!!)

However how to use it is not documented. (Even unit test doesn't exist!) This issue can be a hint.

I tried using Reactor2TcpClient.

package demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.SocketUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class DemoTcpApplication {

    @Slf4j
    @Configuration
    public static class AppConfig {
        int port = SocketUtils.findAvailableTcpPort();

        @Bean(destroyMethod = "shutdown")
        TcpOperations<String> tcpOperations() {
            return new Reactor2TcpClient<>("localhost", port,
                    new Codec<Buffer, Message<String>, Message<String>>() {
                        @Override
                        public Function<Buffer, Message<String>> decoder(Consumer<Message<String>> next) {
                            return bytes -> MessageBuilder.withPayload(bytes.asString()).build();
                        }

                        @Override
                        public Buffer apply(Message<String> message) {
                            return Buffer.wrap(message.getPayload());
                        }
                    });
        }

        @Bean(initMethod = "start", destroyMethod = "stop")
        HelloServer testTcpServer() throws IOException {
            return new HelloServer(port);
        }

        @Bean
        InitializingBean sampleRunner(TcpOperations<String> tcpOperations) {
            return () -> {
                log.info("start");
                IntStream.range(0, 10).forEach(i -> {
                    tcpOperations.connect(new TcpConnectionHandler<String>() {
                        @Override
                        public void afterConnected(TcpConnection<String> connection) {
                            connection.send(MessageBuilder.withPayload("Hello!" + i + "\n").build());
                        }

                        @Override
                        public void afterConnectFailure(Throwable ex) {
                            // NO-OP
                        }

                        @Override
                        public void handleMessage(Message<String> message) {
                            log.info(message.getPayload());
                        }

                        @Override
                        public void handleFailure(Throwable ex) {
                            // NO-OP
                        }

                        @Override
                        public void afterConnectionClosed() {
                            // NO-OP
                        }
                    });
                });
                log.info("end");
            };
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // System.setProperty("reactor.tcp.ioThreadCount", "10");
        new AnnotationConfigApplicationContext(AppConfig.class)
                .registerShutdownHook();
        TimeUnit.SECONDS.sleep(1);
        System.exit(0);
    }
}

HelloServer is an echo TCP server. In this program, a TCP message is handled as String. If other type is appropriate (ex. byte[]), pass the Codec (ex. Codec<Buffer, Message<byte[]>, Message<byte[]>>) to the constructor of Reactor2TcpClient.

This program actually works as follows:

[main] INFO org.springframework.context.annotation.AnnotationConfigApplicationContext - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Sun Oct 04 11:28:14 JST 2015]; root of context hierarchy
[main] INFO demo.HelloServer - listen 22192
[main] INFO demo.DemoTcpApplication$AppConfig - start
[main] INFO demo.DemoTcpApplication$AppConfig - end
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!1
[reactor-tcp-io-4] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!3
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!0
[reactor-tcp-io-3] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!2
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!4
[reactor-tcp-io-3] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!6
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!8
[reactor-tcp-io-4] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!7
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!5
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!9
[Thread-4] INFO org.springframework.context.annotation.AnnotationConfigApplicationContext - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Sun Oct 04 11:28:14 JST 2015]; root of context hierarchy
[Thread-4] INFO demo.HelloServer - stop...

As you can see, handling a connection is complicated.

I've created a helper class to create TcpConnectionHandler called TcpConnectionHandlerBuilder.

package demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;

import java.util.function.Consumer;

@Slf4j
public class TcpConnectionHandlerBuilder<T> {
    private Consumer<TcpConnection<T>> onConnect;
    private Consumer<Message<T>> onMessage;
    private Consumer<Throwable> onConnectFailure;
    private Consumer<Throwable> onFailure;
    private Runnable onClosed;

    public TcpConnectionHandlerBuilder<T> onConnect(
            Consumer<TcpConnection<T>> onConnected) {
        this.onConnect = onConnected;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onMessage(Consumer<Message<T>> onMessage) {
        this.onMessage = onMessage;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onConnectFailure(Consumer<Throwable> onConnectFailure) {
        this.onConnectFailure = onConnectFailure;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onFailure(Consumer<Throwable> onFailure) {
        this.onFailure = onFailure;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onClosed(Runnable onClosed) {
        this.onClosed = onClosed;
        return this;
    }

    public TcpConnectionHandler<T> build() {
        return new TcpConnectionHandler<T>() {
            @Override
            public void afterConnected(TcpConnection<T> connection) {
                log.trace("afterConnected {}", connection);
                if (TcpConnectionHandlerBuilder.this.onConnect != null) {
                    TcpConnectionHandlerBuilder.this.onConnect.accept(connection);
                }
            }

            @Override
            public void afterConnectFailure(Throwable ex) {
                log.trace("afterConnectFailure", ex);
                if (TcpConnectionHandlerBuilder.this.onConnectFailure != null) {
                    TcpConnectionHandlerBuilder.this.onConnectFailure.accept(ex);
                }
            }

            @Override
            public void handleMessage(Message<T> message) {
                log.trace("handleMessage {}", message);
                if (TcpConnectionHandlerBuilder.this.onMessage != null) {
                    TcpConnectionHandlerBuilder.this.onMessage.accept(message);
                }
            }

            @Override
            public void handleFailure(Throwable ex) {
                log.trace("handleFailure", ex);
                if (TcpConnectionHandlerBuilder.this.onFailure != null) {
                    TcpConnectionHandlerBuilder.this.onFailure.accept(ex);
                }
            }

            @Override
            public void afterConnectionClosed() {
                log.trace("afterConnectionClosed");
                if (TcpConnectionHandlerBuilder.this.onClosed != null) {
                    TcpConnectionHandlerBuilder.this.onClosed.run();
                }
            }
        };
    }
}

Using this class, connecting code can be simplified like the following:

tcpOperations.connect(new TcpConnectionHandlerBuilder<String>()
        .onConnect(c -> c.send(MessageBuilder.withPayload("Hello!" + i + "\n").build()))
        .onMessage(m -> log.info(m.getPayload()))
        .build());

This looks good for the high level TCP client. However even though this is simple tcp programming, following dependencies are required (Netty! GS Collections!).

image

I think this is somewhat exaggerated.

In the next entry, I'll introduce another way to use TCP client using Spring Integration.

Sample code shown in this entry is available here.


✒️️ Edit  ⏰ History  🗑 Delete