Dapr Advent Calendar 5日目 - Daprでメッセージング
こんにちは、Dapr Advent Calendar 5日目です。日曜日なのでゆっくりエントリーを書きました。
Pub/Subメッセージングをしてみよう
今回はDaprを使ってキューを使ったメッセージングをします。このメッセージング機能(Pub/sub API)とサービス呼び出し(Invoke API)の2つが、僕がDaprを使おうという決め手になったくらい重要機能だと思っています。
Pub/sub機能についてのドキュメントはこちらにあります。
今回作成するアプリケーションのソースコードはgithubに置いてあります。
https://github.com/cero-t/dapr-advent-2021/
今回は、このリポジトリにある「publish」と「subscribe」モジュールを作ります。
メッセージを送る側のアプリケーションの作成 (publish)
publish側のコントローラークラスの作成
まずはメッセージを送る側、つまりキューに対してエンキューする側のWebアプリケーション(publishモジュール)を作成します。
メッセージを送るために、DaprのPub/sub APIを使います。
(publish) PublishController.java
@RestController public class PublishController { private RestTemplate restTemplate; @Value("${pubsubUrl}") private String pubsubUrl; public PublishController(RestTemplate restTemplate) { this.restTemplate = restTemplate; } @PostMapping("/publish") public void publish(@RequestBody Object message) { restTemplate.postForObject(pubsubUrl, message, Void.class); } }
publish
メソッドにて、リクエストで受け取ったオブジェクトをそのまま pubsuburl
(DaprのPub/sub APIのURL)に投げているだけです。
publish側のアプリケーション設定ファイルの作成
前回のエントリーではURLをソースコードに直書きしましたが、今回は環境への依存性を下げるためURLを設定ファイルに書くことにしました。
(publish) application.properties
server.port=8083 pubsubUrl=http://localhost:${DAPR_HTTP_PORT}/v1.0/publish/pubsub/my-message
このアプリケーションは8083番ポートを利用します。
Pub/sub APIのURLは /v1.0/publish/(pubsub名)/(メッセージトピック)
となり、ここでは pubsub
という名前のpubsubを使っています。なんかちょっと分かりづらいですね。
なぜこの名前にしたかというと、ローカル環境のDaprのデフォルトpubsub名がそうなっているためです。前回のエントリーでデータストアの設定について説明した際、設定ファイルがは~/.dapr/components/statestore.yaml
にあると説明しました。
pubsubについても同様で、~/.dapr/components/pubsub.yaml
に初期設定が記述されています。
~/.dapr/components/pubsub.yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: ""
metadata.name
がpubsubの名称です。要するにRedisをキューとして使う、という設定に pubsub
という名称がついているのです。ここではひとまずこのデフォルト設定のものを使い、Redisを使うことにします。なお、このRedisは dapr init
コマンドを実行した際に作成されたDockerコンテナに繋がるようになっています。
少しpubsub名の説明が長くなりましたが、元の話に戻りましょう。
Pub/sub APIのURLは /v1.0/publish/(pubsub名)/(メッセージトピック)
でした。最後のメッセージトピックは、メッセージをルーティングするためのパスのようなものです。RabbitMQを利用したことがある人ならexchangeの名称として使われる、と言えば分かりやすいかも知れません。
今回は my-message
という名前のメッセージトピックにしました。これが実際にどう使われるかは、後ほど説明します。
メッセージを受け取る側のアプリケーションの作成 (subscribe)
subscribe側のコントローラークラスの作成
続いてメッセージを受け取る側、つまりキューに対してデキューする側のWebアプリケーション(subscribeモジュール)を作成します。
メッセージを受け取る側は、通常のWeb APIとして作成します。
(subscribe) SubscribeController
@RestController public class SubscribeController { @PostMapping("/subscribe") public void subscribe(@RequestBody String message) { System.out.println(message); } }
メッセージを受け取って標準出力に出力するだけです。特にデキューする処理を書いたりせず、ただのWeb APIとして実装します。
subscribe側のアプリケーション設定ファイルの作成
このアプリケーションは8084番ポートを利用するため、設定ファイルにポート番号を書いておきます。
(subscribe) application.properties
server.port=8084
subscribe側のDapr設定ファイルの作成
さらに、上で作成したWeb APIでメッセージを受け取るように設定します。そのためにはDaprの Subscription
設定ファイルを作成する必要があります。このファイルは、少し抵抗があるかも知れませんが ~/.dapr/components/
ディレクトリに作成します。
~./dapr/components/subscription.yaml
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: subscription spec: pubsubname: pubsub topic: my-message route: /subscribe scopes: - subscribe-app
metadata.name
の値は特に利用しないため、任意の名前をつけて構いません。
pubsubname
はpubsubの名称です。デフォルトで設定されていた pubsub
を利用します。
topic
にはメッセージトピックを指定します。ここではpublish側のアプリケーションで指定した my-message
を指定しています。
route
は、呼び出すWebアプリケーションのパスです。SubscribeController
は /subscribe
というパスで待ち受けているため、それを指定します。
scopes
には、待ち受けているアプリケーションのapp-idを指定します。今回はsubscribe側のアプリケーションを subscribe-app
というapp-idで起動する予定ですので、それを指定しています。
ここに複数のアプリケーションのapp-idを列挙しておけば、同じメッセージを複数のアプリケーションで受け取れるようになるわけです。
※GitHubのサンプルコードでは、このファイルを subscribe/.dapr/components/subscription.yaml
に置いています。そちらを利用する場合は、ユーザーディレクトリにコピーしてください。
Daprを使ったアプリケーションの起動
それではDaprを使ってアプリケーションを起動します。まずは送る側のアプリケーション (publishモジュール) から起動します。
cd publish dapr run --app-id publish-app --app-port 8083 ../mvnw spring-boot:run
引数などは特に説明する必要はないですね。
続いて受け取る側のアプリケーション (subscribeモジュール) を起動します。
cd subscribe dapr run --app-id subscribe-app --app-port 8084 ../mvnw spring-boot:run
起動したら、publish側のAPIにJSONでメッセージを渡します。
curl -XPOST "localhost:8083/publish" -H "Content-type:application/json" -d '{ "name": "Shin Tanimoto", "twitter": "@cero_t" }'
コマンドを実行したら、subscribe側のコンソールを見てください。次のようなメッセージが表示されているはずです。
== APP == {"id":"661aace2-dcbe-4d6e-be26-e4d4313d5f9c","source":"publish-app","type":"com.dapr.event.sent","pubsubname":"pubsub","specversion":"1.0","datacontenttype":"application/json","topic":"my-message","traceid":"00-016cf5eccd3ce47cc9bb85ba90742553-244bd4331034542d-01","data":{"name":"Shin Tanimoto","twitter":"@cero_t"}}
何やらたくさんのパラメータが表示されましたが、メッセージ本体が data
に入っていることが確認できます。
CloudEvents 1.0について
ここで受け取ったメッセージには、メッセージ本体以外にもいくつかパラメータが追加されていますね。これらのパラメータはCloudEvents 1.0という仕様に準拠したものです。
CloudEvents 1.0ではいくつかパラメータが指定されており、この仕様に従えば他のミドルウェアとの相互運用性が上がって、Dapr以外のアプリケーションでも受信できるようになるとのこと。ただ正直、僕自身の経験としてはまだそのような使い方をしたことはありません。いつも取得したメッセージから data
の部分のみを取り出しています。
CloudEventsなど不要という声が多いのでしょうか、Daprのサイトにも、CloudEventsを使わないためのドキュメントがあります。
publishする際のURLに ?metadata.rawPayload=true
というクエリパラメータを追加すればメッセージのみが送られるとのことです。
(publish) application.properties
server.port=8083 pubsubUrl=http://localhost:${DAPR_HTTP_PORT}/v1.0/publish/pubsub/my-message?metadata.rawPayload=true
このように設定ファイルを書き換えてアプリケーションを再起動し、再度メッセージを送信すれば、subscribe側のコンソールに次のようなメッセージが表示されるはずです。
== APP == {"name":"Shin Tanimoto","twitter":"@cero_t"}
特にCloudEventsの仕様に準拠させる必要がない場合には、この方法を使っても良いでしょう。
ここまででRedisを介したメッセージングを行うことができました。キュー処理もデキュー処理も、RedisのAPIやSDKを利用するのではなく、HTTP処理だけで書けることが分かりました。
異なるメッセージブローカーを使う
ここまではRedisを用いたメッセージングを行いましたが、他のメッセージブローカーを使うこともできます。
サポートされているメッセージブローカーの一覧はこちらにあります。
ここではRabbitMQを使ってみることにします。まだRabbitMQ対応のコンポーネントはAlpha版のようですが、Dapr v1.5でデッドレターキュー(DLX)にも対応され、実際に運用しても特に問題は起きていません。
RabbitMQの起動
まずはRabbitMQを起動しましょう。Dockerを使って起動します。
docker run -d --name dapr_rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
キューは5672番ポート、管理コンソールは15672番ポートで待ち受けるようにしました。
アプリケーションの修正
Dapr設定ファイルの作成
続いて、このRabbitMQに接続するための設定ファイルをpublishモジュール、subscribeモジュールのcomponentsディレクトリにそれぞれ作成します。
components/rabbitmq-pubsub.yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: rabbitmq-pubsub spec: type: pubsub.rabbitmq version: v1 metadata: - name: host value: amqp://localhost:5672
pubsubname
を分かりやすいよう rabbitmq-pubsub
にしています。
host
にはRabbitMQへの接続アドレスを指定します。
publish側のアプリケーション設定ファイルを追加
pusbsub名を変更したためPub/sub APIのパスも修正が必要です。URLの設定ファイルを追加で作成します。
(publish) application-rabbitmq.properties
pubsubUrl=http://localhost:${DAPR_HTTP_PORT}/v1.0/publish/rabbitmq-pubsub/my-message
メッセージトピックに my-message
を指定しました。
subscribe側にDar設定ファイルの追加
さらに、rabbitmq-pubsub
からデキューするようsubscribe側にSubscriptionの設定ファイルを作成します。
subscribe/components/subscription.yaml
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: subscription spec: pubsubname: rabbitmq-pubsub topic: my-message route: /subscribe scopes: - subscribe-app
先に作成した設定ファイルから pubsubname
の部分を rabbitmq-pubsub
に変えただけです。
設定を有効にしてアプリケーションを起動
publish側の起動
それでは作成した設定ファイルが有効になるようDaprを使ってアプリケーションを起動します。まずはpublish側から起動します。
いったん停止させた後、次のコマンドで起動します。
dapr run --app-id publish-app --app-port 8083 --components-path ./components -- ../mvnw spring-boot:run -Dspring-boot.run.profiles=rabbitmq
components/rabbitmq-pubsub.yaml
を有効にするために --components-path ./components
を追加したのと、application-rabbitmq.properties
が有効になるよう -Dspring-boot.run.profiles=rabbitmq
を追加しました。
この時点でブラウザからRabbitMQの管理コンソールにアクセスしてみましょう。
初期アカウントは Username
Password
ともに guest
です。
「Exchanges」タブでRabbitMQが管理しているExchangeの一覧を参照することができます。
この時点では、まだDaprに関するExchangeなどは特に作成されていませんね。
subscribe側の起動
続いて、subscribe側のアプリケーションを設定ファイルを有効にした状態で起動します。
dapr run --app-id subscribe-app --app-port 8084 --components-path ./components ../mvnw spring-boot:run
上と同じように --components-path ./components
を追加しました。
もう一度、RabbitMQの管理コンソールにアクセスして、「Exchanges」タブを開いてみます。
Exchangeに my-message
が作成されています。このExchangeをクリックして詳細を開いてみます。
このExchangeから subscribe-app-my-message
というQueueにバインドされた状態になっています。RabbitMQに不慣れな方に説明すると、Exchangeはキューのルーティングを行うようなもので、送り先のキューを選択したり、複数のキューに同じメッセージを送ったりすることができます。Queueは文字通り単一のキューです。
AWSに詳しければ、ExchangeはSNS、QueueがSQSのような役割を果たすものだと考えてください。
メッセージを送る
起動を確認できたら、次のコマンドでメッセージを送ってみます。
curl -XPOST "localhost:8083/publish" -H "Content-type:application/json" -d '{ "name": "Shin Tanimoto", "twitter": "@cero_t" }'
subscribe側のコンソールに次のようなメッセージが表示されたはずです。
== APP == {"specversion":"1.0","source":"publish-app","type":"com.dapr.event.sent","topic":"my-message","id":"e9f066d7-d129-430e-b069-4bf01a50fb2d","datacontenttype":"application/json","pubsubname":"rabbitmq-pubsub","traceid":"00-ee177cd7c4aaaf83c591c04e34e2edb2-49e157665d77f0e3-01","data":{"name":"Shin Tanimoto","twitter":"@cero_t"}}
pubsubname
が rabbitmq-pubsub
に変わっていることが分かります。CloudEventsを無効にしているとこれらのパラメータが表示されないので、確認のため有効にしておくと良いでしょう。
ソースコードを変えることなく、設定を変えるだけで接続先をRedisからRabbitMQに変えることができました。
まとめ
- Pub/sub APIを用いてメッセージブローカーに対してHTTPでメッセージを送ることができます
- メッセージブローカーからメッセージを受け取る側は、Web APIと設定ファイルだけで作成ができます
- つまりHTTPの送受信だけでメッセージングができます
- 設定を変えるだけで別のメッセージブローカーに切り替えることができます
なお、長くなるので本文中では触れませんでしたが、publish側のURLを http://localhost:8084/subscribe
に変更すれば、直接subscribe側のAPIを呼び出すこともできます。開発時にはDaprやメッセージブローカーを利用せずに同期処理としてビジネスロジックが正しく動くことを検証し、テスト時にはDaprやメッセージブローカーを持ちいて非同期通信で検証する、ということも可能になります。
特にキューを用いたアプリケーションをJUnitなどでテストする際には、非同期処理側が終わることを少し待ってからassertしなければなりませんが、テスト時には同期処理にしてしまえば、すぐに検証ができて楽ですよね。
今回はメッセージブローカーの独自APIやSDKを使わず、Daprを用いて通信をHTTP(gRPC)に寄せることで、設定だけで柔軟に環境を変えられるということが分かりました。僕がDaprを好きな理由の一つです。
それでは、また来週(明日)!