kafka-nativeとTestcontainersで瞬時にKafkaにアクセスできるSpring Boot開発環境を作る

kafka-nativeはGraalVMでコンパイルされたApache KafkaのNative Imageです。 これを使うとKafkaの起動が非常に早くなります。

Testcontainersからこれを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を用意できます。

ちなみに、Testcontainersはテスト以外にローカル開発でも使えます。

Note

仕組みを知りたい方はこちらの記事を参照してください。

また、Spring Initializrでプロジェクトの雛形を作ると、kafka-nativeを使うための設定が自動で生成されるので、 記事タイトルの環境を作る手間は実は0です(Dockerのみ必要)。

Consumerアプリの作成

Spring Initializrでアプリの雛形を作成します。

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer

src/test/java以下にTestcontainersの設定がされています。apache/kafka-native:latestのイメージが使われています。

$ cat src/test/java/com/example/TestDemoKafkaConsumerApplication.java 
package com.example;

import org.springframework.boot.SpringApplication;

public class TestDemoKafkaConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.from(DemoKafkaConsumerApplication::main).with(TestcontainersConfiguration.class).run(args);
	}

}

$ cat src/test/java/com/example/TestcontainersConfiguration.java     
package com.example;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {

	@Bean
	@ServiceConnection
	KafkaContainer kafkaContainer() {
		return new KafkaContainer(DockerImageName.parse("apache/kafka-native:latest"));
	}

}

次にConsumer用のサンプルコードを作成します。こちらの記事で作成したコードを再利用します。

cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
	private final List<String> messages = new CopyOnWriteArrayList<>();

	private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

	@GetMapping(path = "")
	public List<String> getMessages() {
		return this.messages;
	}

	@KafkaListener(topics = "${demo.topic}")
	public void onMessage(String message) {
		log.info("onMessage({})", message);
		this.messages.add(message);
	}
}
EOF
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF

動作確認

ここまで作成したアプリをTestcontainersを使って試します。 IDEであれば、src/test/java配下のTestDemoKafkaConsumerApplicationmainメソッドを実行すれば良いです。 Maven Pluginを使う場合は、以下のコマンドで実行できます。

./mvnw spring-boot:test-run

次のような起動ログとともにすぐにアプリは起動します。

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.5)

2025-05-16T13:04:24.718+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : Starting DemoKafkaConsumerApplication using Java 21.0.6 with PID 36472 (/Users/toshiaki/git/demo-kafka-consumer/target/classes started by toshiaki in /Users/toshiaki/git/demo-kafka-consumer)
2025-05-16T13:04:24.718+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : No active profile set, falling back to 1 default profile: "default"
2025-05-16T13:04:25.037+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8082 (http)
2025-05-16T13:04:25.041+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-05-16T13:04:25.041+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-16T13:04:25.057+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-05-16T13:04:25.057+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 326 ms
2025-05-16T13:04:25.102+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.images.PullPolicy     : Image pull policy will be performed by: DefaultPullPolicy()
2025-05-16T13:04:25.103+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.ImageNameSubstitutor         : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2025-05-16T13:04:25.108+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Testcontainers version: 1.20.6
2025-05-16T13:04:25.170+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.d.DockerClientProviderStrategy       : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2025-05-16T13:04:25.255+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.d.DockerClientProviderStrategy       : Found Docker environment with local Unix socket (unix:///var/run/docker.sock)
2025-05-16T13:04:25.255+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Docker host IP address is localhost
2025-05-16T13:04:25.267+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Connected to docker: 
  Server Version: 27.5.1
  API Version: 1.47
  Operating System: OrbStack
  Total Memory: 96439 MB
2025-05-16T13:04:25.300+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Creating container for image: testcontainers/ryuk:0.11.0
2025-05-16T13:04:25.319+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.RegistryAuthLocator          : Credential helper/store (docker-credential-osxkeychain) does not have credentials for https://index.docker.io/v1/
2025-05-16T13:04:25.401+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Container testcontainers/ryuk:0.11.0 is starting: 4db5d8b10e4204e5654150e6a653fe59c74def47ddca213ab76b48ef4f5a977b
2025-05-16T13:04:25.578+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Container testcontainers/ryuk:0.11.0 started in PT0.278707S
2025-05-16T13:04:25.580+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.RyukResourceReaper           : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Checking the system...
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : ✔︎ Docker server version should be at least 1.6.0
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Creating container for image: apache/kafka-native:latest
2025-05-16T13:04:25.609+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Container apache/kafka-native:latest is starting: 90fdfa0b9b144fa2905fca8b0ee467ca84cec471320a2bdfa61852f68bc79990
2025-05-16T13:04:25.949+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Container apache/kafka-native:latest started in PT0.367594S
2025-05-16T13:04:26.105+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 1 endpoint beneath base path '/actuator'
2025-05-16T13:04:26.130+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8082 (http) with context path '/'
2025-05-16T13:04:26.141+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.include.jmx.reporter = true
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:32769]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-demo-kafka-consumer-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	enable.metrics.push = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = demo-kafka-consumer
	group.instance.id = null
	group.protocol = classic
	group.remote.assignor = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metadata.recovery.strategy = none
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.max.ms = 1000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2025-05-16T13:04:26.153+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1747368266194
2025-05-16T13:04:26.201+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Subscribed to topic(s): demo
2025-05-16T13:04:26.208+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : Started DemoKafkaConsumerApplication in 1.598 seconds (process running for 1.703)
2025-05-16T13:04:26.295+09:00  WARN 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Error while fetching metadata with correlation id 2 : {demo=UNKNOWN_TOPIC_OR_PARTITION}
2025-05-16T13:04:26.295+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Cluster ID: 4L6g3nShT-eMCtK--X86sw
2025-05-16T13:04:26.411+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Discovered group coordinator localhost:32769 (id: 2147483646 rack: null)
2025-05-16T13:04:26.412+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.420+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Request joining group due to: need to re-join with the given member-id: consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee
2025-05-16T13:04:26.421+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.423+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.425+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Finished assignment for group at generation 1: {consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee=Assignment(partitions=[demo-0])}
2025-05-16T13:04:26.435+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.435+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Notifying assignor about the new Assignment(partitions=[demo-0])
2025-05-16T13:04:26.436+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Adding newly assigned partitions: demo-0
2025-05-16T13:04:26.440+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Found no committed offset for partition demo-0
2025-05-16T13:04:26.442+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Resetting offset for partition demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:32769 (id: 1 rack: null)], epoch=0}}.
2025-05-16T13:04:26.443+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : demo-kafka-consumer: partitions assigned: [demo-0]

Kafkaはkafka-nativeのイメージを使って起動し、アクセス可能なポートは次のログからわかります。

	bootstrap.servers = [localhost:32769]

あるいは次のコマンドでも確認できます。

$ docker ps | grep kafka-native                                                              
90fdfa0b9b14   apache/kafka-native:latest   "sh -c 'while [ ! -f…"   About a minute ago   Up About a minute   0.0.0.0:32769->9092/tcp, [::]:32769->9092/tcp   funny_pascal

KafkaのCLIを使って、メッセージを送ります。

KafkaのCLIはbrewでインストールできます。

brew install kafka

次のコマンドでメッセージを送ります。

cat <<EOF | kafka-console-producer --bootstrap-server localhost:32769 --topic demo
Hello World 1
Hello World 2
Hello World 3
EOF

アプリ側のログを確認すると、メッセージが受信されていることがわかります。

2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 1)
2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 2)
2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 3)

次のコマンドで、メッセージをHTTPでも取得できます。

$ curl -s http://localhost:8082 | jq . 
[
  "Hello World 1",
  "Hello World 2",
  "Hello World 3"
]

kafa-nativeを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を作成できました。 Testcontainersを使うことで、ローカル開発環境でもKafkaを簡単に立ち上げることができるので、非常に便利です。 Testcontainersはテストだけでなく、ローカル開発環境でも使えるので、ぜひ試してみてください。