--- title: Spring Cloud Streamでマイクロサービス間メッセージング tags: ["Spring Boot", "Spring Cloud", "Spring Cloud Stream"] categories: ["Programming", "Java", "org", "springframework", "cloud", "stream"] date: 2016-01-06T09:18:59Z updated: 2016-12-11T14:48:03Z --- [Spring Cloud Stream](http://cloud.spring.io/spring-cloud-stream/)とはマイクロサービス間のメッセージングを簡単に実現するためのプロジェクトです。 Spring Integrationのマイクロサービス版のような位置付けです。 執筆時点のバージョンは`1.0.0.RELEASE`です。 下図のようなSource(データの送り元)、 Sink(データの受け皿)とよばれるモジュールとそれらをつなぐBinderから成ります。 ![image](https://qiita-image-store.s3.amazonaws.com/0/1852/015157ad-dc48-5d53-a869-6174f077b32b.png) このモジュールが1つのマイクロサービスにあたり、Spring Bootアプリケーションとなります。 また、Binderにはメッセージキューが使用され、[Kafka](http://kafka.apache.org/)、[RabbitMQ](https://www.rabbitmq.com/)、[Redis](http://redis.io/)(Redisは1.0.0には含まれていません)から選べます。 Unixのパイプのように表現すると、 source | sink となります。`|`がBinderにあたります。 Source兼SinkであるProcessorも用意されており、 source | processor | processor | sink というような使い方もできます。 Spring XDを使ったことがあれば、このあたりの用語に聞き覚えがあると思います。 このプロジェクトができた背景としてはSpring XDのマイクロサービス対応リファクタリング([Sprin Cloud Data Flow](http://cloud.spring.io/spring-cloud-dataflow/))があるのですが、これはまたいつか説明します。 ### Spring Cloud Streamを使ってみる 習うより慣れろ、で、まずは動かしてみましょう。 BinderとしてKafkaを使います。 #### Kafkaの準備 ここではDockerでKafkaを立ち上げます。(開発用にローカルマシンを汚したくないため) ローカルのKafkaを使っても勿論構わないので、その場合はホスト名を`localhost`にして読み替えてください。 ``` console ($ docker-machine create dev --provider virtualbox) <-- unless docker machine is set up $ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip dev` --env ADVERTISED_PORT=9092 spotify/kafka $ docker-machine ssh dev -f -N -L 9092:localhost:9092 # port forwarding for Kafka $ docker-machine ssh dev -f -N -L 2181:localhost:2181 # port forwarding for ZooKeeper ``` #### Sourceの作成 [Spring Initializr](https://start.spring.io)でSource側のプロジェクトを作成しましょう。 「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。 「Artifact」に"demo-source"を入力して、「Generate Project」をクリック。 ![image](https://qiita-image-store.s3.amazonaws.com/0/1852/61bf08d7-ef19-f378-45a1-79444857f923.png) Mavenプロジェクトがzipでダウンロードできるので、zipを展開して、IDEにインポートしてください。 `DemoSourceApplication.java`にコードを追加して、Sourceのモジュールを作成します。 Sourceとして認識させるために、`org.springframework.cloud.stream.annotation.EnableBinding`アノテーションに`org.springframework.cloud.stream.messaging.Source`クラスを指定します[1]。 ``` java @SpringBootApplication @EnableBinding(Source.class) // [1] public class DemoSourceApplication { // ... } ``` ちなみに、`Source`クラスは次のような実装になっています。 ``` java package org.springframework.cloud.stream.messaging; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); } ``` Sourceからメッセージを送信する方法は、いろいろありますが、最も直感的なのは `Source#output()`メソッドで得られる`MessageChannel`に`org.springframework.messaging.Message`オブジェクトを送る方法です。 次のコードでメッセージを送信する簡単な例を示します。 ``` java package com.example; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Source.class) @RestController public class DemoSourceApplication { @Autowired Source source; // [2] @RequestMapping(path = "message") void greet(@RequestBody String message) { // [3] source.output().send(MessageBuilder.withPayload(message).build()); // [4] } public static void main(String[] args) { SpringApplication.run(DemoSourceApplication.class, args); } } ``` バインドされた`Source`オブジェクトはインジェクション可能です[2]。 この`Source`オブジェクトを使う処理をSpring MVCのHTTPエンドポイント内に実装します[3]。 `Message`は`MessageBuilder`で作成するのが簡単です[4]。作成した`Message`を`MessageChannel#send`メソッドで送信します。 ちなみに、`Message`クラスはSpring Framework本体に含まれているメッセージ抽象化インターフェースであり、Springの色々なプロジェクトで使用されています。
SpringOne 2GX 2014 参加報告 & Spring 4.1について #jsug from Toshiaki Maki
`DemoSourceApplication`を実行する前に、`application.properties`にいくつか設定を行います。 ``` properties # [6] spring.cloud.stream.bindings.output.destination=demo # [7] server.port=9000 ``` メッセージの宛先名は`spring.cloud.stream.bindings.output.destination`プロパティに設定します[6]。 [7]のサーバーポート番号の指定は必須ではないですが、このあとSinkモジュールも起動するので、重複しないように今のうちに変えておきます。 では`DemoSourceApplication`を実行してください。 `kafkacat`コマンドでメッセージを確認しましょう。(`brew install kafkacat`でインストールできます) ``` console $ kafkacat -C -b localhost:9092 -t demo ``` Sourceにリクエストを送ると、 ``` console $ curl -H "Content-Type: text/plain" -d Hello localhost:9000/message ``` `kafkacat`側には以下のような出力が表示されます。 ``` console ? contentType "text/plain"Hello ``` Sinkがまだできていないため、送られたメッセージは処理されません。 次にSinkを作成しましょう。 Sourceからメッセージを送信する他の方法はSinkを作成した後に紹介します。 #### Sinkの作成 続いて、Sink側のプロジェクトを作成しましょう。Source同様に[Spring Initializr](https://start.spring.io)で、「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。 「Artifact」に"demo-sink"を入力して、「Generate Project」をクリック ![image](https://qiita-image-store.s3.amazonaws.com/0/1852/edafe2d7-d372-8caf-e557-918eff87ede6.png) ダウンロードしたzipファイルを展開し、IDEにインポートします。 今度はSinkとして認識させるために、`EnableBinding`アノテーションに`org.springframework.cloud.stream.messaging.Sink`クラスを指定します[8]。 ``` java @SpringBootApplication @EnableBinding(Sink.class) // [8] public class DemoSinkApplication { // ... } ``` `Sink`クラスは`Source`クラス同様に次のような実装になっています。 ``` java package org.springframework.cloud.stream.messaging; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); } ``` `Source`から送られるメッセージを処理する方法も色々あるのですが、 一番簡単な`org.springframework.cloud.stream.annotation.StreamListener`アノテーションをメソッドにつける方法を紹介します。 ``` java package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(Sink.class) public class DemoSinkApplication { @StreamListener(Sink.INPUT) // [10] public void log(Message message) { // [11] System.out.println("received " + message); } public static void main(String[] args) { SpringApplication.run(DemoSinkApplication.class, args); } } ``` メッセージを処理するメソッドに`@StreamListener`アノテーションをつけます。`value`属性には`Sink.INPUT`を指定します[10]。 メソッドの引数には`Message`をとります[11]。 `@StreamListener`メソッドの引数は`Message`クラスではなく、メッセージの中身(Payload)だけとることもできます[12]。この場合はSpringのAPIを明示的に使わなくてもSinkモジュールを作成できます。 ``` java @StreamListener(Sink.INPUT) // [10] public void log(String message) { // [11] System.out.println("received " + message); } ``` 引数に`@Header("ヘッダー名")`を付ければヘッダーの値を取ることもできます。 `application.properties`にSource同様、メッセージの宛先名[14]、アプリケーションのポート番号[16]を設定します。`spring.cloud.stream.bindings.output.destination`と`spring.cloud.stream.bindings.input.destination`の値は同じでないと繋がりません。Consumer Group名`hello`を`spring.cloud.stream.bindings.input.group`に指定します[15]。この値を設定しないと、同じSinkをスケールアウトした際に、同じメッセージが全てのノードに流れてしまいます。 ``` properties # [14] spring.cloud.stream.bindings.input.destination=demo # [15] spring.cloud.stream.bindings.input.group=hello # [16] server.port=9001 ``` `DemoSinkApplication`を実行し、Source側にリクエストを送ると、 ``` console $ curl -H "Content-Type: text/plain" -d Spring localhost:9000/message ``` Sink側にログが出力されます。 ``` console received GenericMessage [payload=Spring, headers={kafka_offset=2, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=16, contentType=text/plain}] ``` これで`source | sink`ができました。 Processorを挟む例はまた今度。基本的には ``` java @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) ``` をつけて、一度受信した後にまた次へ送信するモジュールを挟む感じです。 サンプルは[こちら](https://github.com/spring-cloud/spring-cloud-stream/tree/master/spring-cloud-stream-samples/transform)。 ### Sourceの修正 Source側でメッセージを送信する別の方法をみてみましょう。先ほどは`MessageChannel`を使って任意のタイミングでメッセージを送信しましたが、`@InboundChannelAdapter`アノテーションを使ってメッセージ生成方法を指定する方法があります。 `Message`オブジェクトを生成するための`org.springframework.integration.core.MessageSource`に`@InboundChannelAdapter`をつけてBean定義すれば、定期的に`MessageSource#receive`メソッドが呼ばれ、生成された`Message`が送信されます。 (`receive`というメソッド名に違和感がありますが、[javadoc](http://docs.spring.io/spring-integration/api/org/springframework/integration/core/MessageSource.html#receive--)にはSourceから次のメッセージを取り出すためのメソッドと書かれています。) `DemoSourceApplication`を次のように書き換えます。 ``` java package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @SpringBootApplication @EnableBinding(Source.class) public class DemoSourceApplication { @Bean @InboundChannelAdapter(value= Source.OUTPUT, poller= @Poller(fixedRate = "1000", maxMessagesPerPoll= "1")) // [16] MessageSource source() { return new MessageSource() { @Override public Message receive() { return MessageBuilder.withPayload("Hello!").build(); } }; // もちろん return () -> MessageBuilder.withPayload("Hello!").build(); でもOK } public static void main(String[] args) { SpringApplication.run(DemoSourceApplication.class, args); } } ``` `@InboundChannelAdapter`に送信先を、`@Poller`で送信間隔と一回のタイミングで何通`Message`を送るか(=`receive`メソッドを何回呼ぶか)を指定できます。 この`DemoSourceApplication`を実行すると、Sink側に次のようなログが出力され、定期的にメッセージを受信していることがわかります。 ``` console received GenericMessage [payload=Hello!, headers={kafka_offset=3, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=54, contentType=text/plain}] received GenericMessage [payload=Hello!, headers={kafka_offset=4, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=55, contentType=text/plain}] received GenericMessage [payload=Hello!, headers={kafka_offset=5, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=56, contentType=text/plain}] received GenericMessage [payload=Hello!, headers={kafka_offset=6, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=57, contentType=text/plain}] received GenericMessage [payload=Hello!, headers={kafka_offset=7, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=58, contentType=text/plain}] received GenericMessage [payload=Hello!, headers={kafka_offset=8, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=59, contentType=text/plain}] received GenericMessage [payload=Hello!, headers={kafka_offset=9, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=60, contentType=text/plain}] ... ``` `@InboundChannelAdapter`はPOJOにもつけることができます。次の例のように、DIコンテナに管理されているコンポーネントのメソッドに`@InboundChannelAdapter`をつけると、そのメソッドが定期的に実行され、メッセージが送信されます。メソッドの返り値が`Message`以外のクラスである場合は、返り値のオブジェクトがメッセージの本文(Payload)に設定されます。 ``` java package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.stereotype.Component; @SpringBootApplication @EnableBinding(Source.class) public class DemoSourceApplication { @Component public static class Greeter { @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "1000", maxMessagesPerPoll = "1")) public String greet() { return "Hello!"; } } public static void main(String[] args) { SpringApplication.run(DemoSourceApplication.class, args); } } ``` ### Event Driven Microservices with Sprgin Cloud Stream 詳しくはJJUG CCC 2016 Fallで話しました。Spring Cloud Streamやるなら必見。
Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3 from Toshiaki Maki
### Data Microservices with Spring Cloud Stream, Task, Data Flow Spring Cloud Streamが生まれる背景となるSpring Cloud Data Flowに関してはSpring Day 2016で話しました。こちらも必見。
Data Microservices with Spring Cloud Stream, Task, and Data Flow #jsug #springday from Toshiaki Maki
### チュートリアル 書いた https://github.com/Pivotal-Japan/spring-cloud-stream-tutorial/blob/master/README.md