---
title: Spring Cloud Streamでマイクロサービス間メッセージング
tags: ["Spring Boot", "Spring Cloud", "Spring Cloud Stream"]
categories: ["Programming", "Java", "org", "springframework", "cloud", "stream"]
date: 2016-01-06T09:18:59Z
updated: 2016-12-11T14:48:03Z
---
[Spring Cloud Stream](http://cloud.spring.io/spring-cloud-stream/)とはマイクロサービス間のメッセージングを簡単に実現するためのプロジェクトです。
Spring Integrationのマイクロサービス版のような位置付けです。
執筆時点のバージョンは`1.0.0.RELEASE`です。
下図のようなSource(データの送り元)、 Sink(データの受け皿)とよばれるモジュールとそれらをつなぐBinderから成ります。

このモジュールが1つのマイクロサービスにあたり、Spring Bootアプリケーションとなります。
また、Binderにはメッセージキューが使用され、[Kafka](http://kafka.apache.org/)、[RabbitMQ](https://www.rabbitmq.com/)、[Redis](http://redis.io/)(Redisは1.0.0には含まれていません)から選べます。
Unixのパイプのように表現すると、
source | sink
となります。`|`がBinderにあたります。
Source兼SinkであるProcessorも用意されており、
source | processor | processor | sink
というような使い方もできます。
Spring XDを使ったことがあれば、このあたりの用語に聞き覚えがあると思います。
このプロジェクトができた背景としてはSpring XDのマイクロサービス対応リファクタリング([Sprin Cloud Data Flow](http://cloud.spring.io/spring-cloud-dataflow/))があるのですが、これはまたいつか説明します。
### Spring Cloud Streamを使ってみる
習うより慣れろ、で、まずは動かしてみましょう。
BinderとしてKafkaを使います。
#### Kafkaの準備
ここではDockerでKafkaを立ち上げます。(開発用にローカルマシンを汚したくないため)
ローカルのKafkaを使っても勿論構わないので、その場合はホスト名を`localhost`にして読み替えてください。
``` console
($ docker-machine create dev --provider virtualbox) <-- unless docker machine is set up
$ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip dev` --env ADVERTISED_PORT=9092 spotify/kafka
$ docker-machine ssh dev -f -N -L 9092:localhost:9092 # port forwarding for Kafka
$ docker-machine ssh dev -f -N -L 2181:localhost:2181 # port forwarding for ZooKeeper
```
#### Sourceの作成
[Spring Initializr](https://start.spring.io)でSource側のプロジェクトを作成しましょう。
「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。
「Artifact」に"demo-source"を入力して、「Generate Project」をクリック。

Mavenプロジェクトがzipでダウンロードできるので、zipを展開して、IDEにインポートしてください。
`DemoSourceApplication.java`にコードを追加して、Sourceのモジュールを作成します。
Sourceとして認識させるために、`org.springframework.cloud.stream.annotation.EnableBinding`アノテーションに`org.springframework.cloud.stream.messaging.Source`クラスを指定します[1]。
``` java
@SpringBootApplication
@EnableBinding(Source.class) // [1]
public class DemoSourceApplication {
// ...
}
```
ちなみに、`Source`クラスは次のような実装になっています。
``` java
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
```
Sourceからメッセージを送信する方法は、いろいろありますが、最も直感的なのは
`Source#output()`メソッドで得られる`MessageChannel`に`org.springframework.messaging.Message`オブジェクトを送る方法です。
次のコードでメッセージを送信する簡単な例を示します。
``` java
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class DemoSourceApplication {
@Autowired
Source source; // [2]
@RequestMapping(path = "message")
void greet(@RequestBody String message) { // [3]
source.output().send(MessageBuilder.withPayload(message).build()); // [4]
}
public static void main(String[] args) {
SpringApplication.run(DemoSourceApplication.class, args);
}
}
```
バインドされた`Source`オブジェクトはインジェクション可能です[2]。
この`Source`オブジェクトを使う処理をSpring MVCのHTTPエンドポイント内に実装します[3]。
`Message`は`MessageBuilder`で作成するのが簡単です[4]。作成した`Message`を`MessageChannel#send`メソッドで送信します。
ちなみに、`Message`クラスはSpring Framework本体に含まれているメッセージ抽象化インターフェースであり、Springの色々なプロジェクトで使用されています。
`DemoSourceApplication`を実行する前に、`application.properties`にいくつか設定を行います。
``` properties
# [6]
spring.cloud.stream.bindings.output.destination=demo
# [7]
server.port=9000
```
メッセージの宛先名は`spring.cloud.stream.bindings.output.destination`プロパティに設定します[6]。
[7]のサーバーポート番号の指定は必須ではないですが、このあとSinkモジュールも起動するので、重複しないように今のうちに変えておきます。
では`DemoSourceApplication`を実行してください。
`kafkacat`コマンドでメッセージを確認しましょう。(`brew install kafkacat`でインストールできます)
``` console
$ kafkacat -C -b localhost:9092 -t demo
```
Sourceにリクエストを送ると、
``` console
$ curl -H "Content-Type: text/plain" -d Hello localhost:9000/message
```
`kafkacat`側には以下のような出力が表示されます。
``` console
?
contentType
"text/plain"Hello
```
Sinkがまだできていないため、送られたメッセージは処理されません。
次にSinkを作成しましょう。
Sourceからメッセージを送信する他の方法はSinkを作成した後に紹介します。
#### Sinkの作成
続いて、Sink側のプロジェクトを作成しましょう。Source同様に[Spring Initializr](https://start.spring.io)で、「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。
「Artifact」に"demo-sink"を入力して、「Generate Project」をクリック

ダウンロードしたzipファイルを展開し、IDEにインポートします。
今度はSinkとして認識させるために、`EnableBinding`アノテーションに`org.springframework.cloud.stream.messaging.Sink`クラスを指定します[8]。
``` java
@SpringBootApplication
@EnableBinding(Sink.class) // [8]
public class DemoSinkApplication {
// ...
}
```
`Sink`クラスは`Source`クラス同様に次のような実装になっています。
``` java
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
```
`Source`から送られるメッセージを処理する方法も色々あるのですが、
一番簡単な`org.springframework.cloud.stream.annotation.StreamListener`アノテーションをメソッドにつける方法を紹介します。
``` java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@SpringBootApplication
@EnableBinding(Sink.class)
public class DemoSinkApplication {
@StreamListener(Sink.INPUT) // [10]
public void log(Message message) { // [11]
System.out.println("received " + message);
}
public static void main(String[] args) {
SpringApplication.run(DemoSinkApplication.class, args);
}
}
```
メッセージを処理するメソッドに`@StreamListener`アノテーションをつけます。`value`属性には`Sink.INPUT`を指定します[10]。
メソッドの引数には`Message`をとります[11]。
`@StreamListener`メソッドの引数は`Message`クラスではなく、メッセージの中身(Payload)だけとることもできます[12]。この場合はSpringのAPIを明示的に使わなくてもSinkモジュールを作成できます。
``` java
@StreamListener(Sink.INPUT) // [10]
public void log(String message) { // [11]
System.out.println("received " + message);
}
```
引数に`@Header("ヘッダー名")`を付ければヘッダーの値を取ることもできます。
`application.properties`にSource同様、メッセージの宛先名[14]、アプリケーションのポート番号[16]を設定します。`spring.cloud.stream.bindings.output.destination`と`spring.cloud.stream.bindings.input.destination`の値は同じでないと繋がりません。Consumer Group名`hello`を`spring.cloud.stream.bindings.input.group`に指定します[15]。この値を設定しないと、同じSinkをスケールアウトした際に、同じメッセージが全てのノードに流れてしまいます。
``` properties
# [14]
spring.cloud.stream.bindings.input.destination=demo
# [15]
spring.cloud.stream.bindings.input.group=hello
# [16]
server.port=9001
```
`DemoSinkApplication`を実行し、Source側にリクエストを送ると、
``` console
$ curl -H "Content-Type: text/plain" -d Spring localhost:9000/message
```
Sink側にログが出力されます。
``` console
received GenericMessage [payload=Spring, headers={kafka_offset=2, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=16, contentType=text/plain}]
```
これで`source | sink`ができました。
Processorを挟む例はまた今度。基本的には
``` java
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
```
をつけて、一度受信した後にまた次へ送信するモジュールを挟む感じです。
サンプルは[こちら](https://github.com/spring-cloud/spring-cloud-stream/tree/master/spring-cloud-stream-samples/transform)。
### Sourceの修正
Source側でメッセージを送信する別の方法をみてみましょう。先ほどは`MessageChannel`を使って任意のタイミングでメッセージを送信しましたが、`@InboundChannelAdapter`アノテーションを使ってメッセージ生成方法を指定する方法があります。
`Message`オブジェクトを生成するための`org.springframework.integration.core.MessageSource`に`@InboundChannelAdapter`をつけてBean定義すれば、定期的に`MessageSource#receive`メソッドが呼ばれ、生成された`Message`が送信されます。
(`receive`というメソッド名に違和感がありますが、[javadoc](http://docs.spring.io/spring-integration/api/org/springframework/integration/core/MessageSource.html#receive--)にはSourceから次のメッセージを取り出すためのメソッドと書かれています。)
`DemoSourceApplication`を次のように書き換えます。
``` java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {
@Bean
@InboundChannelAdapter(value= Source.OUTPUT, poller= @Poller(fixedRate = "1000", maxMessagesPerPoll= "1")) // [16]
MessageSource source() {
return new MessageSource() {
@Override
public Message receive() {
return MessageBuilder.withPayload("Hello!").build();
}
};
// もちろん return () -> MessageBuilder.withPayload("Hello!").build(); でもOK
}
public static void main(String[] args) {
SpringApplication.run(DemoSourceApplication.class, args);
}
}
```
`@InboundChannelAdapter`に送信先を、`@Poller`で送信間隔と一回のタイミングで何通`Message`を送るか(=`receive`メソッドを何回呼ぶか)を指定できます。
この`DemoSourceApplication`を実行すると、Sink側に次のようなログが出力され、定期的にメッセージを受信していることがわかります。
``` console
received GenericMessage [payload=Hello!, headers={kafka_offset=3, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=54, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=4, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=55, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=5, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=56, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=6, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=57, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=7, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=58, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=8, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=59, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=9, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=60, contentType=text/plain}]
...
```
`@InboundChannelAdapter`はPOJOにもつけることができます。次の例のように、DIコンテナに管理されているコンポーネントのメソッドに`@InboundChannelAdapter`をつけると、そのメソッドが定期的に実行され、メッセージが送信されます。メソッドの返り値が`Message`以外のクラスである場合は、返り値のオブジェクトがメッセージの本文(Payload)に設定されます。
``` java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.stereotype.Component;
@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {
@Component
public static class Greeter {
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "1000", maxMessagesPerPoll = "1"))
public String greet() {
return "Hello!";
}
}
public static void main(String[] args) {
SpringApplication.run(DemoSourceApplication.class, args);
}
}
```
### Event Driven Microservices with Sprgin Cloud Stream
詳しくはJJUG CCC 2016 Fallで話しました。Spring Cloud Streamやるなら必見。
### Data Microservices with Spring Cloud Stream, Task, Data Flow
Spring Cloud Streamが生まれる背景となるSpring Cloud Data Flowに関してはSpring Day 2016で話しました。こちらも必見。
### チュートリアル
書いた
https://github.com/Pivotal-Japan/spring-cloud-stream-tutorial/blob/master/README.md