Warning
This article was automatically translated by OpenAI (gpt-4.1).It may be edited eventually, but please be aware that it may contain incorrect information at this time.
kafka-native is a Native Image of Apache Kafka compiled with GraalVM.
By using this, Kafka can start up extremely quickly.
By using this from Testcontainers, you can set up a Spring Boot development environment that allows instant access to Kafka.
Incidentally, Testcontainers can be used not only for testing but also for local development.
Note
If you want to know how it works, please refer to this article.
Also, when you create a project skeleton with Spring Initializr, the configuration for using kafka-native is automatically generated,
so in reality, there is zero effort required to set up the environment described in the article title (only Docker is needed).
Creating a Consumer App
Create an app skeleton using Spring Initializr.
curl https://start.spring.io/starter.tgz \
-d artifactId=demo-kafka-consumer \
-d baseDir=demo-kafka-consumer \
-d packageName=com.example \
-d dependencies=kafka,testcontainers,web,actuator \
-d type=maven-project \
-d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer
Under src/test/java, the Testcontainers configuration is set up. The apache/kafka-native:latest image is used.
$ cat src/test/java/com/example/TestDemoKafkaConsumerApplication.java
package com.example;
import org.springframework.boot.SpringApplication;
public class TestDemoKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.from(DemoKafkaConsumerApplication::main).with(TestcontainersConfiguration.class).run(args);
}
}
$ cat src/test/java/com/example/TestcontainersConfiguration.java
package com.example;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {
@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native:latest"));
}
}
Next, create sample code for the Consumer. Reuse the code created in this article.
cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
private final List<String> messages = new CopyOnWriteArrayList<>();
private final Logger log = LoggerFactory.getLogger(ConsumerController.class);
@GetMapping(path = "")
public List<String> getMessages() {
return this.messages;
}
@KafkaListener(topics = "${demo.topic}")
public void onMessage(String message) {
log.info("onMessage({})", message);
this.messages.add(message);
}
}
EOF
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF
Verifying Operation
Let's try out the app we've created so far using Testcontainers.
If you're using an IDE, just run the main method of TestDemoKafkaConsumerApplication under src/test/java.
If you want to use the Maven Plugin, you can run it with the following command:
./mvnw spring-boot:test-run
The app will start up immediately along with the following startup logs.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.4.5)
2025-05-16T13:04:24.718+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : Starting DemoKafkaConsumerApplication using Java 21.0.6 with PID 36472 (/Users/toshiaki/git/demo-kafka-consumer/target/classes started by toshiaki in /Users/toshiaki/git/demo-kafka-consumer)
2025-05-16T13:04:24.718+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : No active profile set, falling back to 1 default profile: "default"
2025-05-16T13:04:25.037+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8082 (http)
2025-05-16T13:04:25.041+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2025-05-16T13:04:25.041+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-16T13:04:25.057+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2025-05-16T13:04:25.057+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 326 ms
2025-05-16T13:04:25.102+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.images.PullPolicy : Image pull policy will be performed by: DefaultPullPolicy()
2025-05-16T13:04:25.103+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.ImageNameSubstitutor : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2025-05-16T13:04:25.108+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Testcontainers version: 1.20.6
2025-05-16T13:04:25.170+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.d.DockerClientProviderStrategy : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2025-05-16T13:04:25.255+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.d.DockerClientProviderStrategy : Found Docker environment with local Unix socket (unix:///var/run/docker.sock)
2025-05-16T13:04:25.255+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Docker host IP address is localhost
2025-05-16T13:04:25.267+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Connected to docker:
Server Version: 27.5.1
API Version: 1.47
Operating System: OrbStack
Total Memory: 96439 MB
2025-05-16T13:04:25.300+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Creating container for image: testcontainers/ryuk:0.11.0
2025-05-16T13:04:25.319+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.RegistryAuthLocator : Credential helper/store (docker-credential-osxkeychain) does not have credentials for https://index.docker.io/v1/
2025-05-16T13:04:25.401+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Container testcontainers/ryuk:0.11.0 is starting: 4db5d8b10e4204e5654150e6a653fe59c74def47ddca213ab76b48ef4f5a977b
2025-05-16T13:04:25.578+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.testcontainers/ryuk:0.11.0 : Container testcontainers/ryuk:0.11.0 started in PT0.278707S
2025-05-16T13:04:25.580+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.t.utility.RyukResourceReaper : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : Checking the system...
2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] org.testcontainers.DockerClientFactory : ✔︎ Docker server version should be at least 1.6.0
2025-05-16T13:04:25.581+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Creating container for image: apache/kafka-native:latest
2025-05-16T13:04:25.609+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Container apache/kafka-native:latest is starting: 90fdfa0b9b144fa2905fca8b0ee467ca84cec471320a2bdfa61852f68bc79990
2025-05-16T13:04:25.949+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] tc.apache/kafka-native:latest : Container apache/kafka-native:latest started in PT0.367594S
2025-05-16T13:04:26.105+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 1 endpoint beneath base path '/actuator'
2025-05-16T13:04:26.130+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8082 (http) with context path '/'
2025-05-16T13:04:26.141+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [localhost:32769]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-demo-kafka-consumer-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
enable.metrics.push = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = demo-kafka-consumer
group.instance.id = null
group.protocol = classic
group.remote.assignor = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2025-05-16T13:04:26.153+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.8.1
2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 70d6ff42debf7e17
2025-05-16T13:04:26.194+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1747368266194
2025-05-16T13:04:26.201+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Subscribed to topic(s): demo
2025-05-16T13:04:26.208+09:00 INFO 36472 --- [demo-kafka-consumer] [ main] c.example.DemoKafkaConsumerApplication : Started DemoKafkaConsumerApplication in 1.598 seconds (process running for 1.703)
2025-05-16T13:04:26.295+09:00 WARN 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Error while fetching metadata with correlation id 2 : {demo=UNKNOWN_TOPIC_OR_PARTITION}
2025-05-16T13:04:26.295+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Cluster ID: 4L6g3nShT-eMCtK--X86sw
2025-05-16T13:04:26.411+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Discovered group coordinator localhost:32769 (id: 2147483646 rack: null)
2025-05-16T13:04:26.412+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.420+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Request joining group due to: need to re-join with the given member-id: consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee
2025-05-16T13:04:26.421+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.423+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.425+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Finished assignment for group at generation 1: {consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee=Assignment(partitions=[demo-0])}
2025-05-16T13:04:26.435+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.435+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Notifying assignor about the new Assignment(partitions=[demo-0])
2025-05-16T13:04:26.436+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Adding newly assigned partitions: demo-0
2025-05-16T13:04:26.440+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Found no committed offset for partition demo-0
2025-05-16T13:04:26.442+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Resetting offset for partition demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:32769 (id: 1 rack: null)], epoch=0}}.
2025-05-16T13:04:26.443+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : demo-kafka-consumer: partitions assigned: [demo-0]
Kafka is started using the kafka-native image, and you can see the accessible port from the following log:
bootstrap.servers = [localhost:32769]
Alternatively, you can also check with the following command:
$ docker ps | grep kafka-native
90fdfa0b9b14 apache/kafka-native:latest "sh -c 'while [ ! -f…" About a minute ago Up About a minute 0.0.0.0:32769->9092/tcp, [::]:32769->9092/tcp funny_pascal
Send messages using the Kafka CLI.
You can install the Kafka CLI with brew.
brew install kafka
Send messages with the following command:
cat <<EOF | kafka-console-producer --bootstrap-server localhost:32769 --topic demo
Hello World 1
Hello World 2
Hello World 3
EOF
If you check the app's logs, you can see that the messages have been received.
2025-05-16T13:08:02.225+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World 1)
2025-05-16T13:08:02.225+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World 2)
2025-05-16T13:08:02.225+09:00 INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World 3)
You can also retrieve the messages via HTTP with the following command:
$ curl -s http://localhost:8082 | jq .
[
"Hello World 1",
"Hello World 2",
"Hello World 3"
]
By using kafka-native, we were able to create a Spring Boot development environment that allows instant access to Kafka.
With Testcontainers, you can easily spin up Kafka even in your local development environment, which is extremely convenient.
Testcontainers can be used not only for testing but also for local development environments, so be sure to give it a try.