Spring Cloud Streamとはマイクロサービス間のメッセージングを簡単に実現するためのプロジェクトです。 Spring Integrationのマイクロサービス版のような位置付けです。
執筆時点のバージョンは1.0.0.RELEASE
です。
下図のようなSource(データの送り元)、 Sink(データの受け皿)とよばれるモジュールとそれらをつなぐBinderから成ります。
このモジュールが1つのマイクロサービスにあたり、Spring Bootアプリケーションとなります。
また、Binderにはメッセージキューが使用され、Kafka、RabbitMQ、Redis(Redisは1.0.0には含まれていません)から選べます。
Unixのパイプのように表現すると、
source | sink
となります。|
がBinderにあたります。
Source兼SinkであるProcessorも用意されており、
source | processor | processor | sink
というような使い方もできます。
Spring XDを使ったことがあれば、このあたりの用語に聞き覚えがあると思います。 このプロジェクトができた背景としてはSpring XDのマイクロサービス対応リファクタリング(Sprin Cloud Data Flow)があるのですが、これはまたいつか説明します。
Spring Cloud Streamを使ってみる
習うより慣れろ、で、まずは動かしてみましょう。 BinderとしてKafkaを使います。
Kafkaの準備
ここではDockerでKafkaを立ち上げます。(開発用にローカルマシンを汚したくないため)
ローカルのKafkaを使っても勿論構わないので、その場合はホスト名をlocalhost
にして読み替えてください。
($ docker-machine create dev --provider virtualbox) <-- unless docker machine is set up
$ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip dev` --env ADVERTISED_PORT=9092 spotify/kafka
$ docker-machine ssh dev -f -N -L 9092:localhost:9092 # port forwarding for Kafka
$ docker-machine ssh dev -f -N -L 2181:localhost:2181 # port forwarding for ZooKeeper
Sourceの作成
Spring InitializrでSource側のプロジェクトを作成しましょう。
「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。 「Artifact」に"demo-source"を入力して、「Generate Project」をクリック。
Mavenプロジェクトがzipでダウンロードできるので、zipを展開して、IDEにインポートしてください。
DemoSourceApplication.java
にコードを追加して、Sourceのモジュールを作成します。
Sourceとして認識させるために、org.springframework.cloud.stream.annotation.EnableBinding
アノテーションにorg.springframework.cloud.stream.messaging.Source
クラスを指定します[1]。
@SpringBootApplication
@EnableBinding(Source.class) // [1]
public class DemoSourceApplication {
// ...
}
ちなみに、Source
クラスは次のような実装になっています。
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Sourceからメッセージを送信する方法は、いろいろありますが、最も直感的なのは
Source#output()
メソッドで得られるMessageChannel
にorg.springframework.messaging.Message
オブジェクトを送る方法です。
次のコードでメッセージを送信する簡単な例を示します。
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class DemoSourceApplication {
@Autowired
Source source; // [2]
@RequestMapping(path = "message")
void greet(@RequestBody String message) { // [3]
source.output().send(MessageBuilder.withPayload(message).build()); // [4]
}
public static void main(String[] args) {
SpringApplication.run(DemoSourceApplication.class, args);
}
}
バインドされたSource
オブジェクトはインジェクション可能です[2]。
このSource
オブジェクトを使う処理をSpring MVCのHTTPエンドポイント内に実装します[3]。
Message
はMessageBuilder
で作成するのが簡単です[4]。作成したMessage
をMessageChannel#send
メソッドで送信します。
ちなみに、Message
クラスはSpring Framework本体に含まれているメッセージ抽象化インターフェースであり、Springの色々なプロジェクトで使用されています。
# [6]
spring.cloud.stream.bindings.output.destination=demo
# [7]
server.port=9000
メッセージの宛先名はspring.cloud.stream.bindings.output.destination
プロパティに設定します[6]。
[7]のサーバーポート番号の指定は必須ではないですが、このあとSinkモジュールも起動するので、重複しないように今のうちに変えておきます。
ではDemoSourceApplication
を実行してください。
kafkacat
コマンドでメッセージを確認しましょう。(brew install kafkacat
でインストールできます)
$ kafkacat -C -b localhost:9092 -t demo
Sourceにリクエストを送ると、
$ curl -H "Content-Type: text/plain" -d Hello localhost:9000/message
kafkacat
側には以下のような出力が表示されます。
?
contentType
"text/plain"Hello
Sinkがまだできていないため、送られたメッセージは処理されません。 次にSinkを作成しましょう。
Sourceからメッセージを送信する他の方法はSinkを作成した後に紹介します。
Sinkの作成
続いて、Sink側のプロジェクトを作成しましょう。Source同様にSpring Initializrで、「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。 「Artifact」に"demo-sink"を入力して、「Generate Project」をクリック
ダウンロードしたzipファイルを展開し、IDEにインポートします。
今度はSinkとして認識させるために、EnableBinding
アノテーションにorg.springframework.cloud.stream.messaging.Sink
クラスを指定します[8]。
@SpringBootApplication
@EnableBinding(Sink.class) // [8]
public class DemoSinkApplication {
// ...
}
Sink
クラスはSource
クラス同様に次のような実装になっています。
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
Source
から送られるメッセージを処理する方法も色々あるのですが、
一番簡単なorg.springframework.cloud.stream.annotation.StreamListener
アノテーションをメソッドにつける方法を紹介します。
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@SpringBootApplication
@EnableBinding(Sink.class)
public class DemoSinkApplication {
@StreamListener(Sink.INPUT) // [10]
public void log(Message<String> message) { // [11]
System.out.println("received " + message);
}
public static void main(String[] args) {
SpringApplication.run(DemoSinkApplication.class, args);
}
}
メッセージを処理するメソッドに@StreamListener
アノテーションをつけます。value
属性にはSink.INPUT
を指定します[10]。
メソッドの引数にはMessage
をとります[11]。
@StreamListener
メソッドの引数はMessage
クラスではなく、メッセージの中身(Payload)だけとることもできます[12]。この場合はSpringのAPIを明示的に使わなくてもSinkモジュールを作成できます。
@StreamListener(Sink.INPUT) // [10]
public void log(String message) { // [11]
System.out.println("received " + message);
}
引数に@Header("ヘッダー名")
を付ければヘッダーの値を取ることもできます。
application.properties
にSource同様、メッセージの宛先名[14]、アプリケーションのポート番号[16]を設定します。spring.cloud.stream.bindings.output.destination
とspring.cloud.stream.bindings.input.destination
の値は同じでないと繋がりません。Consumer Group名hello
をspring.cloud.stream.bindings.input.group
に指定します[15]。この値を設定しないと、同じSinkをスケールアウトした際に、同じメッセージが全てのノードに流れてしまいます。
# [14]
spring.cloud.stream.bindings.input.destination=demo
# [15]
spring.cloud.stream.bindings.input.group=hello
# [16]
server.port=9001
DemoSinkApplication
を実行し、Source側にリクエストを送ると、
$ curl -H "Content-Type: text/plain" -d Spring localhost:9000/message
Sink側にログが出力されます。
received GenericMessage [payload=Spring, headers={kafka_offset=2, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=16, contentType=text/plain}]
これでsource | sink
ができました。
Processorを挟む例はまた今度。基本的には
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
をつけて、一度受信した後にまた次へ送信するモジュールを挟む感じです。
サンプルはこちら。
Sourceの修正
Source側でメッセージを送信する別の方法をみてみましょう。先ほどはMessageChannel
を使って任意のタイミングでメッセージを送信しましたが、@InboundChannelAdapter
アノテーションを使ってメッセージ生成方法を指定する方法があります。
Message
オブジェクトを生成するためのorg.springframework.integration.core.MessageSource
に@InboundChannelAdapter
をつけてBean定義すれば、定期的にMessageSource#receive
メソッドが呼ばれ、生成されたMessage
が送信されます。
(receive
というメソッド名に違和感がありますが、javadocにはSourceから次のメッセージを取り出すためのメソッドと書かれています。)
DemoSourceApplication
を次のように書き換えます。
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {
@Bean
@InboundChannelAdapter(value= Source.OUTPUT, poller= @Poller(fixedRate = "1000", maxMessagesPerPoll= "1")) // [16]
MessageSource<String> source() {
return new MessageSource<String>() {
@Override
public Message<String> receive() {
return MessageBuilder.withPayload("Hello!").build();
}
};
// もちろん return () -> MessageBuilder.withPayload("Hello!").build(); でもOK
}
public static void main(String[] args) {
SpringApplication.run(DemoSourceApplication.class, args);
}
}
@InboundChannelAdapter
に送信先を、@Poller
で送信間隔と一回のタイミングで何通Message
を送るか(=receive
メソッドを何回呼ぶか)を指定できます。
このDemoSourceApplication
を実行すると、Sink側に次のようなログが出力され、定期的にメッセージを受信していることがわかります。
received GenericMessage [payload=Hello!, headers={kafka_offset=3, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=54, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=4, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=55, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=5, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=56, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=6, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=57, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=7, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=58, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=8, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=59, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=9, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=60, contentType=text/plain}]
...
@InboundChannelAdapter
はPOJOにもつけることができます。次の例のように、DIコンテナに管理されているコンポーネントのメソッドに@InboundChannelAdapter
をつけると、そのメソッドが定期的に実行され、メッセージが送信されます。メソッドの返り値がMessage
以外のクラスである場合は、返り値のオブジェクトがメッセージの本文(Payload)に設定されます。
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.stereotype.Component;
@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {
@Component
public static class Greeter {
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "1000", maxMessagesPerPoll = "1"))
public String greet() {
return "Hello!";
}
}
public static void main(String[] args) {
SpringApplication.run(DemoSourceApplication.class, args);
}
}
Event Driven Microservices with Sprgin Cloud Stream
詳しくはJJUG CCC 2016 Fallで話しました。Spring Cloud Streamやるなら必見。
Data Microservices with Spring Cloud Stream, Task, Data Flow
Spring Cloud Streamが生まれる背景となるSpring Cloud Data Flowに関してはSpring Day 2016で話しました。こちらも必見。
チュートリアル
書いた
https://github.com/Pivotal-Japan/spring-cloud-stream-tutorial/blob/master/README.md