---
title: Notes on accessing Upstash Serverless Kafka with Spring Boot
tags: ["Spring Boot", "Kafka", "Upstash", "Spring for Apache Kafka"]
categories: ["Programming", "Java", "org", "springframework", "kafka"]
date: 2024-07-01T02:58:12Z
updated: 2024-07-01T03:02:43Z
---

> ⚠️ This article was automatically translated by OpenAI (gpt-4o).
> It may be edited eventually, but please be aware that it may contain incorrect information at this time.

Let's try accessing [Upstash](https://upstash.com/)'s [Serverless Kafka](https://upstash.com/docs/kafka/overall/getstarted) from Spring Boot.

### Creating an Upstash Account

Go to https://upstash.com/kafka and click "Start Now" under "Free".

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/2d4c7e0b-f34b-4375-801a-688816bb2de7">

Log in with Google (or other options).

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/c2b51eac-46fc-485a-83ec-16f3cdaf596f">

That's it for account creation.

### Creating a Kafka Cluster

Select "Kafka" from the dashboard.

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/8eb430d0-e097-4e84-b93f-7af7bd17e3a1">

Click the "Create Cluster" button and enter the necessary details.

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/8516329b-0d33-481f-8683-60b5a6e3ca45">

Enter the topic name and click the "Create Topic" button.

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/823cecd6-5187-4c8a-92ff-dceb1aac5d58">

Now Kafka is ready.

### Creating a Producer Application

Create a skeleton application using Spring Initializr.

```
curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-producer \
       -d baseDir=demo-kafka-producer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaProducerApplication | tar -xzvf -
cd demo-kafka-producer
```

Create sample code for the producer.

```java
cat <<'EOF' > src/main/java/com/example/ProducerController.java
package com.example;

import java.util.concurrent.CompletableFuture;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
	private final KafkaTemplate<String, String> kafkaTemplate;

	private final String topic;

	public ProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${demo.topic}") String topic) {
		this.kafkaTemplate = kafkaTemplate;
		this.topic = topic;
	}

	@PostMapping(path = "/")
	public CompletableFuture<String> hello(@RequestBody String message) {
		CompletableFuture<SendResult<String, String>> result = this.kafkaTemplate.send(this.topic, message);
		return result.thenApply(r -> r.getProducerRecord().toString());
	}
}
EOF
```

Define properties that are independent of the Kafka environment in `application.properties`.

```properties
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.shutdown=graceful
spring.application.name=demo-kafka-producer
EOF
```

Define properties that depend on the Upstash environment in `application-upstash.properties`.

You can check the connection information from the dashboard.

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/1aec5747-66ee-438d-b97d-f0d1f5078ade">

Change the following:

* `spring.kafka.bootstrap-servers`
* `spring.kafka.jaas.options.password`
* `spring.kafka.jaas.options.username`

```properties
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.producer.security.protocol=SASL_SSL
EOF
```

Build and run the application. Set the profile to `upstash` to connect to Upstash.

```
./mvnw clean package -DskipTests
java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
```

### Creating a Consumer Application

Create a skeleton application using Spring Initializr.

```
curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer
```

Create sample code for the consumer.

```java
cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
	private final List<String> messages = new CopyOnWriteArrayList<>();

	private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

	@GetMapping(path = "")
	public List<String> getMessages() {
		return this.messages;
	}

	@KafkaListener(topics = "${demo.topic}")
	public void onMessage(String message) {
		log.info("onMessage({})", message);
		this.messages.add(message);
	}
}
EOF
```

Define properties that are independent of the Kafka environment in `application.properties`.

```properties
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
server.shutdown=graceful
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF
```

Define properties that depend on the Upstash environment in `application-upstash.properties`.

Change the following:

* `spring.kafka.bootstrap-servers`
* `spring.kafka.jaas.options.password`
* `spring.kafka.jaas.options.username`

```properties
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.consumer.security.protocol=SASL_SSL
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
EOF
```

Build and run the application. Set the profile to `upstash` to connect to Upstash.

```
./mvnw clean package -DskipTests
java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
```

### Testing time

Send requests to the producer application.

```
curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World"
curl localhost:8080 -H "Content-Type: text/plain" -d "Test"   
```

If the following logs are output on the consumer side, it is working correctly.

```
2024-06-30T02:50:22.952+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World)
2024-06-30T02:50:29.202+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Test)
```

You can also retrieve the sent messages from the consumer application's API.

```
$ curl localhost:8082
["Hello World","Test"]
```
