kafka-nativeはGraalVMでコンパイルされたApache KafkaのNative Imageです。
これを使うとKafkaの起動が非常に早くなります。
Testcontainersからこれを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を用意できます。
ちなみに、Testcontainersはテスト以外にローカル開発でも使えます。
Note
仕組みを知りたい方はこちらの記事を参照してください。
また、Spring Initializrでプロジェクトの雛形を作ると、kafka-nativeを使うための設定が自動で生成されるので、
記事タイトルの環境を作る手間は実は0です(Dockerのみ必要)。
Consumerアプリの作成
Spring Initializrでアプリの雛形を作成します。
curl https://start.spring.io/starter.tgz \
-d artifactId=demo-kafka-consumer \
-d baseDir=demo-kafka-consumer \
-d packageName=com.example \
-d dependencies=kafka,testcontainers,web,actuator \
-d type=maven-project \
-d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer
src/test/java以下にTestcontainersの設定がされています。apache/kafka-native:latestのイメージが使われています。
$ cat src/test/java/com/example/TestDemoKafkaConsumerApplication.java
package com.example;
import org.springframework.boot.SpringApplication;
public class TestDemoKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.from(DemoKafkaConsumerApplication::main).with(TestcontainersConfiguration.class).run(args);
}
}
$ cat src/test/java/com/example/TestcontainersConfiguration.java
package com.example;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {
@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native:latest"));
}
}
次にConsumer用のサンプルコードを作成します。こちらの記事で作成したコードを再利用します。
cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
private final List<String> messages = new CopyOnWriteArrayList<>();
private final Logger log = LoggerFactory.getLogger(ConsumerController.class);
@GetMapping(path = "")
public List<String> getMessages() {
return this.messages;
}
@KafkaListener(topics = "${demo.topic}")
public void onMessage(String message) {
log.info("onMessage({})", message);
this.messages.add(message);
}
}
EOF
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF
動作確認
ここまで作成したアプリをTestcontainersを使って試します。
IDEであれば、src/test/java配下のTestDemoKafkaConsumerApplicationのmainメソッドを実行すれば良いです。
Maven Pluginを使う場合は、以下のコマンドで実行できます。
./mvnw spring-boot:test-run
次のような起動ログとともにすぐにアプリは起動します。
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.4.5)
2025-05-16T13:04:24.718+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : Starting DemoKafkaConsumerApplication using Java 21.0.6 with PID 36472 (/Users/toshiaki/git/demo-kafka-consumer/target/classes started by toshiaki in /Users/toshiaki/git/demo-kafka-consumer)
2025-05-16T13:04:24.718+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : No active profile set, falling back to 1 default profile: "default"
2025-05-16T13:04:25.037+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8082 (http)
2025-05-16T13:04:25.041+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2025-05-16T13:04:25.041+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-16T13:04:25.057+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2025-05-16T13:04:25.057+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 326 ms
2025-05-16T13:04:25.102+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.images.PullPolicy : Image pull policy will be performed by: DefaultPullPolicy()
2025-05-16T13:04:25.103+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.ImageNameSubstitutor : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2025-05-16T13:04:25.108+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Testcontainers version: 1.20.6
2025-05-16T13:04:25.170+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.d.DockerClientProviderStrategy : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2025-05-16T13:04:25.255+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.d.DockerClientProviderStrategy : Found Docker environment with local Unix socket (unix:///var/run/docker.sock)
2025-05-16T13:04:25.255+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Docker host IP address is localhost
2025-05-16T13:04:25.267+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Connected to docker:
Server Version: 27.5.1
API Version: 1.47
Operating System: OrbStack
Total Memory: 96439 MB
2025-05-16T13:04:25.300+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Creating container for image: testcontainers/ryuk:0.11.0
2025-05-16T13:04:25.319+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.RegistryAuthLocator : Credential helper/store (docker-credential-osxkeychain) does not have credentials for https://index.docker.io/v1/
2025-05-16T13:04:25.401+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Container testcontainers/ryuk:0.11.0 is starting: 4db5d8b10e4204e5654150e6a653fe59c74def47ddca213ab76b48ef4f5a977b
2025-05-16T13:04:25.578+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Container testcontainers/ryuk:0.11.0 started in PT0.278707S
2025-05-16T13:04:25.580+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.RyukResourceReaper : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Checking the system...
2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : ✔︎ Docker server version should be at least 1.6.0
2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Creating container for image: apache/kafka-native:latest
2025-05-16T13:04:25.609+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Container apache/kafka-native:latest is starting: 90fdfa0b9b144fa2905fca8b0ee467ca84cec471320a2bdfa61852f68bc79990
2025-05-16T13:04:25.949+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Container apache/kafka-native:latest started in PT0.367594S
2025-05-16T13:04:26.105+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 1 endpoint beneath base path '/actuator'
2025-05-16T13:04:26.130+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8082 (http) with context path '/'
2025-05-16T13:04:26.141+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [localhost:32769]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-demo-kafka-consumer-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
enable.metrics.push = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = demo-kafka-consumer
group.instance.id = null
group.protocol = classic
group.remote.assignor = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2025-05-16T13:04:26.153+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.8.1
2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 70d6ff42debf7e17
2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1747368266194
2025-05-16T13:04:26.201+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Subscribed to topic(s): demo
2025-05-16T13:04:26.208+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : Started DemoKafkaConsumerApplication in 1.598 seconds (process running for 1.703)
2025-05-16T13:04:26.295+09:00 WARN 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Error while fetching metadata with correlation id 2 : {demo=UNKNOWN_TOPIC_OR_PARTITION}
2025-05-16T13:04:26.295+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Cluster ID: 4L6g3nShT-eMCtK--X86sw
2025-05-16T13:04:26.411+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Discovered group coordinator localhost:32769 (id: 2147483646 rack: null)
2025-05-16T13:04:26.412+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.420+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Request joining group due to: need to re-join with the given member-id: consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee
2025-05-16T13:04:26.421+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.423+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.425+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Finished assignment for group at generation 1: {consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee=Assignment(partitions=[demo-0])}
2025-05-16T13:04:26.435+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.435+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Notifying assignor about the new Assignment(partitions=[demo-0])
2025-05-16T13:04:26.436+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Adding newly assigned partitions: demo-0
2025-05-16T13:04:26.440+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Found no committed offset for partition demo-0
2025-05-16T13:04:26.442+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Resetting offset for partition demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:32769 (id: 1 rack: null)], epoch=0}}.
2025-05-16T13:04:26.443+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : demo-kafka-consumer: partitions assigned: [demo-0]
Kafkaはkafka-nativeのイメージを使って起動し、アクセス可能なポートは次のログからわかります。
bootstrap.servers = [localhost:32769]
あるいは次のコマンドでも確認できます。
$ docker ps | grep kafka-native
90fdfa0b9b14 apache/kafka-native:latest "sh -c 'while [ ! -f…" About a minute ago Up About a minute 0.0.0.0:32769->9092/tcp, [::]:32769->9092/tcp funny_pascal
KafkaのCLIを使って、メッセージを送ります。
KafkaのCLIはbrewでインストールできます。
brew install kafka
次のコマンドでメッセージを送ります。
cat <<EOF | kafka-console-producer --bootstrap-server localhost:32769 --topic demo
Hello World 1
Hello World 2
Hello World 3
EOF
アプリ側のログを確認すると、メッセージが受信されていることがわかります。
2025-05-16T13:08:02.225+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World 1)
2025-05-16T13:08:02.225+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World 2)
2025-05-16T13:08:02.225+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World 3)
次のコマンドで、メッセージをHTTPでも取得できます。
$ curl -s http://localhost:8082 | jq .
[
"Hello World 1",
"Hello World 2",
"Hello World 3"
]
kafa-nativeを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を作成できました。
Testcontainersを使うことで、ローカル開発環境でもKafkaを簡単に立ち上げることができるので、非常に便利です。
Testcontainersはテストだけでなく、ローカル開発環境でも使えるので、ぜひ試してみてください。