---
title: Upstashの Serverless KafkaにSpring Bootでアクセスするメモ
tags: ["Spring Boot", "Kafka", "Upstash", "Spring for Apache Kafka"]
categories: ["Programming", "Java", "org", "springframework", "kafka"]
date: 2024-07-01T02:58:12Z
updated: 2024-07-01T03:02:43Z
---
[Upstash](https://upstash.com/)の[Serverless Kafka](https://upstash.com/docs/kafka/overall/getstarted)にSpring Bootからアクセスしてみます。
### Upstashアカウントの作成
https://upstash.com/kafka から"Free"の"Start Now"をクリック
Google(等)でログイン
これだけでアカウント作成完了。
### Kafkaクラスタの作成
ダッシュボードから"Kafka"を選択
”Create Cluster"ボタンをクリックし、名前などを入力
Topic名を入力して、"Create Topic"ボタンをクリック
これでKafkaの準備ができました。
### Producerアプリの作成
Spring Initializrでアプリの雛形を作成します。
```
curl https://start.spring.io/starter.tgz \
-d artifactId=demo-kafka-producer \
-d baseDir=demo-kafka-producer \
-d packageName=com.example \
-d dependencies=kafka,testcontainers,web,actuator \
-d type=maven-project \
-d applicationName=DemoKafkaProducerApplication | tar -xzvf -
cd demo-kafka-producer
```
Producer用のサンプルコードを作成します。
```java
cat <<'EOF' > src/main/java/com/example/ProducerController.java
package com.example;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
private final KafkaTemplate kafkaTemplate;
private final String topic;
public ProducerController(KafkaTemplate kafkaTemplate, @Value("${demo.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@PostMapping(path = "/")
public CompletableFuture hello(@RequestBody String message) {
CompletableFuture> result = this.kafkaTemplate.send(this.topic, message);
return result.thenApply(r -> r.getProducerRecord().toString());
}
}
EOF
```
Kafka環境に依らないプロパティを`application.properties`に定義します。
```properties
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.shutdown=graceful
spring.application.name=demo-kafka-producer
EOF
```
Upstash環境に依存するプロパティを`application-upstash.properties`に定義します。
接続するための情報はダッシュボードから確認できます。
以下の
* `spring.kafka.bootstrap-servers`
* `spring.kafka.jaas.options.password`
* `spring.kafka.jaas.options.username`
を変更してください。
```properties
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.producer.security.protocol=SASL_SSL
EOF
```
アプリをビルドし、実行します。profileを`upstash`に設定して、Upstashに接続できるようにします。
```
./mvnw clean package -DskipTests
java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
```
### Consumerアプリの作成
Spring Initializrでアプリの雛形を作成します。
```
curl https://start.spring.io/starter.tgz \
-d artifactId=demo-kafka-consumer \
-d baseDir=demo-kafka-consumer \
-d packageName=com.example \
-d dependencies=kafka,testcontainers,web,actuator \
-d type=maven-project \
-d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer
```
Consumer用のサンプルコードを作成します。
```java
cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
private final List messages = new CopyOnWriteArrayList<>();
private final Logger log = LoggerFactory.getLogger(ConsumerController.class);
@GetMapping(path = "")
public List getMessages() {
return this.messages;
}
@KafkaListener(topics = "${demo.topic}")
public void onMessage(String message) {
log.info("onMessage({})", message);
this.messages.add(message);
}
}
EOF
```
Kafka環境に依らないプロパティを`application.properties`に定義します。
```properties
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
server.shutdown=graceful
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF
```
Upstash環境に依存するプロパティを`application-upstash.properties`に定義します。
以下の
* `spring.kafka.bootstrap-servers`
* `spring.kafka.jaas.options.password`
* `spring.kafka.jaas.options.username`
を変更してください。
```properties
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.consumer.security.protocol=SASL_SSL
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
EOF
```
アプリをビルドし、実行します。profileを`upstash`に設定して、Upstashに接続できるようにします。
```
./mvnw clean package -DskipTests
java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
```
### 動作確認
Producerアプリにリクエストを送ります。
```
curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World"
curl localhost:8080 -H "Content-Type: text/plain" -d "Test"
```
Consumer側に次のようなログが出力されればOKです。
```
2024-06-30T02:50:22.952+09:00 INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World)
2024-06-30T02:50:29.202+09:00 INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Test)
```
また、送ったメッセージはConsumerアプリののAPIから取得できます。
```
$ curl localhost:8082
["Hello World","Test"]
```