--- title: kafka-nativeとTestcontainersで瞬時にKafkaにアクセスできるSpring Boot開発環境を作る tags: ["Spring Boot", "Kafka", "Testcontainers", "Spring for Apache Kafka"] categories: ["Programming", "Java", "org", "springframework", "kafka"] date: 2025-05-16T04:10:31Z updated: 2025-05-16T04:12:16Z --- [kafka-native](https://hub.docker.com/r/apache/kafka-native)はGraalVMでコンパイルされたApache KafkaのNative Imageです。 これを使うとKafkaの起動が非常に早くなります。 Testcontainersからこれを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を用意できます。 ちなみに、Testcontainersはテスト以外にローカル開発でも使えます。 > [!NOTE] > 仕組みを知りたい方は[こちら](/entries/825)の記事を参照してください。 また、Spring Initializrでプロジェクトの雛形を作ると、kafka-nativeを使うための設定が自動で生成されるので、 記事タイトルの環境を作る手間は実は0です(Dockerのみ必要)。 ### Consumerアプリの作成 Spring Initializrでアプリの雛形を作成します。 ``` curl https://start.spring.io/starter.tgz \ -d artifactId=demo-kafka-consumer \ -d baseDir=demo-kafka-consumer \ -d packageName=com.example \ -d dependencies=kafka,testcontainers,web,actuator \ -d type=maven-project \ -d applicationName=DemoKafkaConsumerApplication | tar -xzvf - cd demo-kafka-consumer ``` `src/test/java`以下にTestcontainersの設定がされています。`apache/kafka-native:latest`のイメージが使われています。 ```java $ cat src/test/java/com/example/TestDemoKafkaConsumerApplication.java package com.example; import org.springframework.boot.SpringApplication; public class TestDemoKafkaConsumerApplication { public static void main(String[] args) { SpringApplication.from(DemoKafkaConsumerApplication::main).with(TestcontainersConfiguration.class).run(args); } } $ cat src/test/java/com/example/TestcontainersConfiguration.java package com.example; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.context.annotation.Bean; import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; @TestConfiguration(proxyBeanMethods = false) class TestcontainersConfiguration { @Bean @ServiceConnection KafkaContainer kafkaContainer() { return new KafkaContainer(DockerImageName.parse("apache/kafka-native:latest")); } } ``` 次にConsumer用のサンプルコードを作成します。[こちらの記事](/entries/804)で作成したコードを再利用します。 ```java cat <<'EOF' > src/main/java/com/example/ConsumerController.java package com.example; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ConsumerController { private final List messages = new CopyOnWriteArrayList<>(); private final Logger log = LoggerFactory.getLogger(ConsumerController.class); @GetMapping(path = "") public List getMessages() { return this.messages; } @KafkaListener(topics = "${demo.topic}") public void onMessage(String message) { log.info("onMessage({})", message); this.messages.add(message); } } EOF ``` ```properties cat <<'EOF' > src/main/resources/application.properties demo.topic=demo server.port=8082 spring.application.name=demo-kafka-consumer spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.group-id=${spring.application.name} EOF ``` ### 動作確認 ここまで作成したアプリをTestcontainersを使って試します。 IDEであれば、`src/test/java`配下の`TestDemoKafkaConsumerApplication`の`main`メソッドを実行すれば良いです。 Maven Pluginを使う場合は、以下のコマンドで実行できます。 ```bash ./mvnw spring-boot:test-run ``` 次のような起動ログとともにすぐにアプリは起動します。 ``` . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v3.4.5) 2025-05-16T13:04:24.718+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : Starting DemoKafkaConsumerApplication using Java 21.0.6 with PID 36472 (/Users/toshiaki/git/demo-kafka-consumer/target/classes started by toshiaki in /Users/toshiaki/git/demo-kafka-consumer) 2025-05-16T13:04:24.718+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : No active profile set, falling back to 1 default profile: "default" 2025-05-16T13:04:25.037+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8082 (http) 2025-05-16T13:04:25.041+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2025-05-16T13:04:25.041+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.40] 2025-05-16T13:04:25.057+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2025-05-16T13:04:25.057+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 326 ms 2025-05-16T13:04:25.102+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.images.PullPolicy : Image pull policy will be performed by: DefaultPullPolicy() 2025-05-16T13:04:25.103+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.ImageNameSubstitutor : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor') 2025-05-16T13:04:25.108+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Testcontainers version: 1.20.6 2025-05-16T13:04:25.170+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.d.DockerClientProviderStrategy : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first 2025-05-16T13:04:25.255+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.d.DockerClientProviderStrategy : Found Docker environment with local Unix socket (unix:///var/run/docker.sock) 2025-05-16T13:04:25.255+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Docker host IP address is localhost 2025-05-16T13:04:25.267+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Connected to docker: Server Version: 27.5.1 API Version: 1.47 Operating System: OrbStack Total Memory: 96439 MB 2025-05-16T13:04:25.300+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Creating container for image: testcontainers/ryuk:0.11.0 2025-05-16T13:04:25.319+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.RegistryAuthLocator : Credential helper/store (docker-credential-osxkeychain) does not have credentials for https://index.docker.io/v1/ 2025-05-16T13:04:25.401+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Container testcontainers/ryuk:0.11.0 is starting: 4db5d8b10e4204e5654150e6a653fe59c74def47ddca213ab76b48ef4f5a977b 2025-05-16T13:04:25.578+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Container testcontainers/ryuk:0.11.0 started in PT0.278707S 2025-05-16T13:04:25.580+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.RyukResourceReaper : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit 2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Checking the system... 2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : ✔︎ Docker server version should be at least 1.6.0 2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Creating container for image: apache/kafka-native:latest 2025-05-16T13:04:25.609+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Container apache/kafka-native:latest is starting: 90fdfa0b9b144fa2905fca8b0ee467ca84cec471320a2bdfa61852f68bc79990 2025-05-16T13:04:25.949+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Container apache/kafka-native:latest started in PT0.367594S 2025-05-16T13:04:26.105+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 1 endpoint beneath base path '/actuator' 2025-05-16T13:04:26.130+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8082 (http) with context path '/' 2025-05-16T13:04:26.141+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.include.jmx.reporter = true auto.offset.reset = earliest bootstrap.servers = [localhost:32769] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-demo-kafka-consumer-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false enable.metrics.push = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = demo-kafka-consumer group.instance.id = null group.protocol = classic group.remote.assignor = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metadata.recovery.strategy = none metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.max.ms = 1000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 45000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2025-05-16T13:04:26.153+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector 2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.8.1 2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 70d6ff42debf7e17 2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1747368266194 2025-05-16T13:04:26.201+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Subscribed to topic(s): demo 2025-05-16T13:04:26.208+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : Started DemoKafkaConsumerApplication in 1.598 seconds (process running for 1.703) 2025-05-16T13:04:26.295+09:00 WARN 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Error while fetching metadata with correlation id 2 : {demo=UNKNOWN_TOPIC_OR_PARTITION} 2025-05-16T13:04:26.295+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Cluster ID: 4L6g3nShT-eMCtK--X86sw 2025-05-16T13:04:26.411+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Discovered group coordinator localhost:32769 (id: 2147483646 rack: null) 2025-05-16T13:04:26.412+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group 2025-05-16T13:04:26.420+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Request joining group due to: need to re-join with the given member-id: consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee 2025-05-16T13:04:26.421+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group 2025-05-16T13:04:26.423+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'} 2025-05-16T13:04:26.425+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Finished assignment for group at generation 1: {consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee=Assignment(partitions=[demo-0])} 2025-05-16T13:04:26.435+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'} 2025-05-16T13:04:26.435+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Notifying assignor about the new Assignment(partitions=[demo-0]) 2025-05-16T13:04:26.436+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Adding newly assigned partitions: demo-0 2025-05-16T13:04:26.440+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Found no committed offset for partition demo-0 2025-05-16T13:04:26.442+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Resetting offset for partition demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:32769 (id: 1 rack: null)], epoch=0}}. 2025-05-16T13:04:26.443+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : demo-kafka-consumer: partitions assigned: [demo-0] ``` Kafkaは`kafka-native`のイメージを使って起動し、アクセス可能なポートは次のログからわかります。 ``` bootstrap.servers = [localhost:32769] ``` あるいは次のコマンドでも確認できます。 ```bash $ docker ps | grep kafka-native 90fdfa0b9b14 apache/kafka-native:latest "sh -c 'while [ ! -f…" About a minute ago Up About a minute 0.0.0.0:32769->9092/tcp, [::]:32769->9092/tcp funny_pascal ``` KafkaのCLIを使って、メッセージを送ります。 KafkaのCLIはbrewでインストールできます。 ```bash brew install kafka ``` 次のコマンドでメッセージを送ります。 ```bash cat <