IK.AM

@making's tech note


Spring Cloud Streamでマイクロサービス間メッセージング

🗃 {Programming/Java/org/springframework/cloud/stream}
🏷 Spring Boot 🏷 Spring Cloud 🏷 Spring Cloud Stream 
🗓 Updated at 2016-12-11T14:48:03Z  🗓 Created at 2016-01-06T09:18:59Z   🌎 English Page

Spring Cloud Streamとはマイクロサービス間のメッセージングを簡単に実現するためのプロジェクトです。 Spring Integrationのマイクロサービス版のような位置付けです。

執筆時点のバージョンは1.0.0.RELEASEです。

下図のようなSource(データの送り元)、 Sink(データの受け皿)とよばれるモジュールとそれらをつなぐBinderから成ります。

image

このモジュールが1つのマイクロサービスにあたり、Spring Bootアプリケーションとなります。

また、Binderにはメッセージキューが使用され、KafkaRabbitMQRedis(Redisは1.0.0には含まれていません)から選べます。

Unixのパイプのように表現すると、

source | sink

となります。|がBinderにあたります。

Source兼SinkであるProcessorも用意されており、

source | processor | processor | sink

というような使い方もできます。

Spring XDを使ったことがあれば、このあたりの用語に聞き覚えがあると思います。 このプロジェクトができた背景としてはSpring XDのマイクロサービス対応リファクタリング(Sprin Cloud Data Flow)があるのですが、これはまたいつか説明します。

Spring Cloud Streamを使ってみる

習うより慣れろ、で、まずは動かしてみましょう。 BinderとしてKafkaを使います。

Kafkaの準備

ここではDockerでKafkaを立ち上げます。(開発用にローカルマシンを汚したくないため) ローカルのKafkaを使っても勿論構わないので、その場合はホスト名をlocalhostにして読み替えてください。

($ docker-machine create dev --provider virtualbox) <-- unless docker machine is set up
$ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip dev` --env ADVERTISED_PORT=9092 spotify/kafka
$ docker-machine ssh dev -f -N -L 9092:localhost:9092 # port forwarding for Kafka
$ docker-machine ssh dev -f -N -L 2181:localhost:2181 # port forwarding for ZooKeeper

Sourceの作成

Spring InitializrでSource側のプロジェクトを作成しましょう。

「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。 「Artifact」に"demo-source"を入力して、「Generate Project」をクリック。

image

Mavenプロジェクトがzipでダウンロードできるので、zipを展開して、IDEにインポートしてください。 DemoSourceApplication.javaにコードを追加して、Sourceのモジュールを作成します。

Sourceとして認識させるために、org.springframework.cloud.stream.annotation.EnableBindingアノテーションにorg.springframework.cloud.stream.messaging.Sourceクラスを指定します[1]。

@SpringBootApplication
@EnableBinding(Source.class) // [1]
public class DemoSourceApplication {
   // ...
}

ちなみに、Sourceクラスは次のような実装になっています。

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    
    String OUTPUT = "output";
    
    @Output(Source.OUTPUT)
    MessageChannel output();

}

Sourceからメッセージを送信する方法は、いろいろありますが、最も直感的なのは Source#output()メソッドで得られるMessageChannelorg.springframework.messaging.Messageオブジェクトを送る方法です。

次のコードでメッセージを送信する簡単な例を示します。

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class DemoSourceApplication {

    @Autowired
    Source source; // [2]

    @RequestMapping(path = "message")
    void greet(@RequestBody String message) { // [3]
        source.output().send(MessageBuilder.withPayload(message).build()); // [4]
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSourceApplication.class, args);
    }
}

バインドされたSourceオブジェクトはインジェクション可能です[2]。 このSourceオブジェクトを使う処理をSpring MVCのHTTPエンドポイント内に実装します[3]。 MessageMessageBuilderで作成するのが簡単です[4]。作成したMessageMessageChannel#sendメソッドで送信します。

ちなみに、MessageクラスはSpring Framework本体に含まれているメッセージ抽象化インターフェースであり、Springの色々なプロジェクトで使用されています。

`DemoSourceApplication`を実行する前に、`application.properties`にいくつか設定を行います。
# [6]
spring.cloud.stream.bindings.output.destination=demo
# [7]
server.port=9000

メッセージの宛先名はspring.cloud.stream.bindings.output.destinationプロパティに設定します[6]。 [7]のサーバーポート番号の指定は必須ではないですが、このあとSinkモジュールも起動するので、重複しないように今のうちに変えておきます。

ではDemoSourceApplicationを実行してください。

kafkacatコマンドでメッセージを確認しましょう。(brew install kafkacatでインストールできます)

$ kafkacat -C -b localhost:9092 -t demo

Sourceにリクエストを送ると、

$ curl -H "Content-Type: text/plain" -d Hello localhost:9000/message

kafkacat側には以下のような出力が表示されます。

?
 contentType
            "text/plain"Hello

Sinkがまだできていないため、送られたメッセージは処理されません。 次にSinkを作成しましょう。

Sourceからメッセージを送信する他の方法はSinkを作成した後に紹介します。

Sinkの作成

続いて、Sink側のプロジェクトを作成しましょう。Source同様にSpring Initializrで、「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。 「Artifact」に"demo-sink"を入力して、「Generate Project」をクリック

image

ダウンロードしたzipファイルを展開し、IDEにインポートします。

今度はSinkとして認識させるために、EnableBindingアノテーションにorg.springframework.cloud.stream.messaging.Sinkクラスを指定します[8]。

@SpringBootApplication
@EnableBinding(Sink.class) // [8]
public class DemoSinkApplication {
    // ...
}

SinkクラスはSourceクラス同様に次のような実装になっています。

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {

    String INPUT = "input";

    @Input(Sink.INPUT)
    SubscribableChannel input();

}

Sourceから送られるメッセージを処理する方法も色々あるのですが、 一番簡単なorg.springframework.cloud.stream.annotation.StreamListenerアノテーションをメソッドにつける方法を紹介します。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(Sink.class)
public class DemoSinkApplication {

    @StreamListener(Sink.INPUT) // [10]
    public void log(Message<String> message) { // [11]
        System.out.println("received " + message);
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSinkApplication.class, args);
    }
}

メッセージを処理するメソッドに@StreamListenerアノテーションをつけます。value属性にはSink.INPUTを指定します[10]。 メソッドの引数にはMessageをとります[11]。

@StreamListenerメソッドの引数はMessageクラスではなく、メッセージの中身(Payload)だけとることもできます[12]。この場合はSpringのAPIを明示的に使わなくてもSinkモジュールを作成できます。

    @StreamListener(Sink.INPUT) // [10]
    public void log(String message) { // [11]
        System.out.println("received " + message);
    }

引数に@Header("ヘッダー名")を付ければヘッダーの値を取ることもできます。

application.propertiesにSource同様、メッセージの宛先名[14]、アプリケーションのポート番号[16]を設定します。spring.cloud.stream.bindings.output.destinationspring.cloud.stream.bindings.input.destinationの値は同じでないと繋がりません。Consumer Group名hellospring.cloud.stream.bindings.input.groupに指定します[15]。この値を設定しないと、同じSinkをスケールアウトした際に、同じメッセージが全てのノードに流れてしまいます。

# [14]
spring.cloud.stream.bindings.input.destination=demo
# [15]
spring.cloud.stream.bindings.input.group=hello
# [16]
server.port=9001

DemoSinkApplicationを実行し、Source側にリクエストを送ると、

$ curl -H "Content-Type: text/plain" -d Spring localhost:9000/message

Sink側にログが出力されます。

received GenericMessage [payload=Spring, headers={kafka_offset=2, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=16, contentType=text/plain}]

これでsource | sinkができました。

Processorを挟む例はまた今度。基本的には

@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

をつけて、一度受信した後にまた次へ送信するモジュールを挟む感じです。

サンプルはこちら

Sourceの修正

Source側でメッセージを送信する別の方法をみてみましょう。先ほどはMessageChannelを使って任意のタイミングでメッセージを送信しましたが、@InboundChannelAdapterアノテーションを使ってメッセージ生成方法を指定する方法があります。

Messageオブジェクトを生成するためのorg.springframework.integration.core.MessageSource@InboundChannelAdapterをつけてBean定義すれば、定期的にMessageSource#receiveメソッドが呼ばれ、生成されたMessageが送信されます。 (receiveというメソッド名に違和感がありますが、javadocにはSourceから次のメッセージを取り出すためのメソッドと書かれています。)

DemoSourceApplicationを次のように書き換えます。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {

    @Bean
    @InboundChannelAdapter(value= Source.OUTPUT, poller= @Poller(fixedRate = "1000", maxMessagesPerPoll= "1")) // [16]
    MessageSource<String> source() {
        return new MessageSource<String>() {
            @Override
            public Message<String> receive() {
                return MessageBuilder.withPayload("Hello!").build();
            }
        };
        // もちろん return () -> MessageBuilder.withPayload("Hello!").build(); でもOK
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSourceApplication.class, args);
    }
}

@InboundChannelAdapterに送信先を、@Pollerで送信間隔と一回のタイミングで何通Messageを送るか(=receiveメソッドを何回呼ぶか)を指定できます。

このDemoSourceApplicationを実行すると、Sink側に次のようなログが出力され、定期的にメッセージを受信していることがわかります。

received GenericMessage [payload=Hello!, headers={kafka_offset=3, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=54, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=4, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=55, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=5, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=56, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=6, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=57, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=7, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=58, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=8, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=59, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=9, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=60, contentType=text/plain}]
...

@InboundChannelAdapterはPOJOにもつけることができます。次の例のように、DIコンテナに管理されているコンポーネントのメソッドに@InboundChannelAdapterをつけると、そのメソッドが定期的に実行され、メッセージが送信されます。メソッドの返り値がMessage以外のクラスである場合は、返り値のオブジェクトがメッセージの本文(Payload)に設定されます。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.stereotype.Component;

@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {

    @Component
    public static class Greeter {
        @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "1000", maxMessagesPerPoll = "1"))
        public String greet() {
            return "Hello!";
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSourceApplication.class, args);
    }
}

Event Driven Microservices with Sprgin Cloud Stream

詳しくはJJUG CCC 2016 Fallで話しました。Spring Cloud Streamやるなら必見。

Data Microservices with Spring Cloud Stream, Task, Data Flow

Spring Cloud Streamが生まれる背景となるSpring Cloud Data Flowに関してはSpring Day 2016で話しました。こちらも必見。

チュートリアル

書いた

https://github.com/Pivotal-Japan/spring-cloud-stream-tutorial/blob/master/README.md


✒️️ Edit  ⏰ History  🗑 Delete