谷本 心 in せろ部屋

はてなダイアリーから引っ越してきました

Dapr Advent Calendar 5日目 - Daprでメッセージング

こんにちは、Dapr Advent Calendar 5日目です。日曜日なのでゆっくりエントリーを書きました。

Pub/Subメッセージングをしてみよう

今回はDaprを使ってキューを使ったメッセージングをします。このメッセージング機能(Pub/sub API)とサービス呼び出し(Invoke API)の2つが、僕がDaprを使おうという決め手になったくらい重要機能だと思っています。

f:id:cero-t:20211205083946p:plain
今回作るアプリケーション

Pub/sub機能についてのドキュメントはこちらにあります。

docs.dapr.io

今回作成するアプリケーションのソースコードgithubに置いてあります。

https://github.com/cero-t/dapr-advent-2021/

今回は、このリポジトリにある「publish」と「subscribe」モジュールを作ります。

メッセージを送る側のアプリケーションの作成 (publish)

publish側のコントローラークラスの作成

まずはメッセージを送る側、つまりキューに対してエンキューする側のWebアプリケーション(publishモジュール)を作成します。

メッセージを送るために、DaprのPub/sub APIを使います。

(publish) PublishController.java

@RestController
public class PublishController {
    private RestTemplate restTemplate;

    @Value("${pubsubUrl}")
    private String pubsubUrl;

    public PublishController(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    @PostMapping("/publish")
    public void publish(@RequestBody Object message) {
        restTemplate.postForObject(pubsubUrl, message, Void.class);
    }
}

publish メソッドにて、リクエストで受け取ったオブジェクトをそのまま pubsuburl(DaprのPub/sub APIのURL)に投げているだけです。

publish側のアプリケーション設定ファイルの作成

前回のエントリーではURLをソースコードに直書きしましたが、今回は環境への依存性を下げるためURLを設定ファイルに書くことにしました。

(publish) application.properties

server.port=8083
pubsubUrl=http://localhost:${DAPR_HTTP_PORT}/v1.0/publish/pubsub/my-message

このアプリケーションは8083番ポートを利用します。

Pub/sub APIのURLは /v1.0/publish/(pubsub名)/(メッセージトピック) となり、ここでは pubsub という名前のpubsubを使っています。なんかちょっと分かりづらいですね。

なぜこの名前にしたかというと、ローカル環境のDaprのデフォルトpubsub名がそうなっているためです。前回のエントリーでデータストアの設定について説明した際、設定ファイルがは~/.dapr/components/statestore.yaml にあると説明しました。

pubsubについても同様で、~/.dapr/components/pubsub.yaml に初期設定が記述されています。

~/.dapr/components/pubsub.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

metadata.name がpubsubの名称です。要するにRedisをキューとして使う、という設定に pubsub という名称がついているのです。ここではひとまずこのデフォルト設定のものを使い、Redisを使うことにします。なお、このRedisは dapr init コマンドを実行した際に作成されたDockerコンテナに繋がるようになっています。

少しpubsub名の説明が長くなりましたが、元の話に戻りましょう。

Pub/sub APIのURLは /v1.0/publish/(pubsub名)/(メッセージトピック) でした。最後のメッセージトピックは、メッセージをルーティングするためのパスのようなものです。RabbitMQを利用したことがある人ならexchangeの名称として使われる、と言えば分かりやすいかも知れません。

今回は my-message という名前のメッセージトピックにしました。これが実際にどう使われるかは、後ほど説明します。

メッセージを受け取る側のアプリケーションの作成 (subscribe)

subscribe側のコントローラークラスの作成

続いてメッセージを受け取る側、つまりキューに対してデキューする側のWebアプリケーション(subscribeモジュール)を作成します。

メッセージを受け取る側は、通常のWeb APIとして作成します。

(subscribe) SubscribeController

@RestController
public class SubscribeController {
    @PostMapping("/subscribe")
    public void subscribe(@RequestBody String message) {
        System.out.println(message);
    }
}

メッセージを受け取って標準出力に出力するだけです。特にデキューする処理を書いたりせず、ただのWeb APIとして実装します。

subscribe側のアプリケーション設定ファイルの作成

このアプリケーションは8084番ポートを利用するため、設定ファイルにポート番号を書いておきます。

(subscribe) application.properties

server.port=8084

subscribe側のDapr設定ファイルの作成

さらに、上で作成したWeb APIでメッセージを受け取るように設定します。そのためにはDaprの Subscription 設定ファイルを作成する必要があります。このファイルは、少し抵抗があるかも知れませんが ~/.dapr/components/ ディレクトリに作成します。

~./dapr/components/subscription.yaml

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: subscription
spec:
  pubsubname: pubsub
  topic: my-message
  route: /subscribe
scopes:
  - subscribe-app

metadata.name の値は特に利用しないため、任意の名前をつけて構いません。

pubsubname はpubsubの名称です。デフォルトで設定されていた pubsub を利用します。

topic にはメッセージトピックを指定します。ここではpublish側のアプリケーションで指定した my-message を指定しています。

route は、呼び出すWebアプリケーションのパスです。SubscribeController/subscribe というパスで待ち受けているため、それを指定します。

scopes には、待ち受けているアプリケーションのapp-idを指定します。今回はsubscribe側のアプリケーションを subscribe-app というapp-idで起動する予定ですので、それを指定しています。

ここに複数のアプリケーションのapp-idを列挙しておけば、同じメッセージを複数のアプリケーションで受け取れるようになるわけです。

GitHubのサンプルコードでは、このファイルを subscribe/.dapr/components/subscription.yaml に置いています。そちらを利用する場合は、ユーザーディレクトリにコピーしてください。

Daprを使ったアプリケーションの起動

それではDaprを使ってアプリケーションを起動します。まずは送る側のアプリケーション (publishモジュール) から起動します。

cd publish
dapr run --app-id publish-app --app-port 8083 ../mvnw spring-boot:run

引数などは特に説明する必要はないですね。

続いて受け取る側のアプリケーション (subscribeモジュール) を起動します。

cd subscribe
dapr run --app-id subscribe-app --app-port 8084 ../mvnw spring-boot:run

起動したら、publish側のAPIJSONでメッセージを渡します。

curl -XPOST "localhost:8083/publish" -H "Content-type:application/json" -d '{
  "name": "Shin Tanimoto",
  "twitter": "@cero_t"
}'

コマンドを実行したら、subscribe側のコンソールを見てください。次のようなメッセージが表示されているはずです。

== APP == {"id":"661aace2-dcbe-4d6e-be26-e4d4313d5f9c","source":"publish-app","type":"com.dapr.event.sent","pubsubname":"pubsub","specversion":"1.0","datacontenttype":"application/json","topic":"my-message","traceid":"00-016cf5eccd3ce47cc9bb85ba90742553-244bd4331034542d-01","data":{"name":"Shin Tanimoto","twitter":"@cero_t"}}

何やらたくさんのパラメータが表示されましたが、メッセージ本体が data に入っていることが確認できます。

CloudEvents 1.0について

ここで受け取ったメッセージには、メッセージ本体以外にもいくつかパラメータが追加されていますね。これらのパラメータはCloudEvents 1.0という仕様に準拠したものです。

github.com

CloudEvents 1.0ではいくつかパラメータが指定されており、この仕様に従えば他のミドルウェアとの相互運用性が上がって、Dapr以外のアプリケーションでも受信できるようになるとのこと。ただ正直、僕自身の経験としてはまだそのような使い方をしたことはありません。いつも取得したメッセージから data の部分のみを取り出しています。

CloudEventsなど不要という声が多いのでしょうか、Daprのサイトにも、CloudEventsを使わないためのドキュメントがあります。

docs.dapr.io

publishする際のURLに ?metadata.rawPayload=true というクエリパラメータを追加すればメッセージのみが送られるとのことです。

(publish) application.properties

server.port=8083
pubsubUrl=http://localhost:${DAPR_HTTP_PORT}/v1.0/publish/pubsub/my-message?metadata.rawPayload=true

このように設定ファイルを書き換えてアプリケーションを再起動し、再度メッセージを送信すれば、subscribe側のコンソールに次のようなメッセージが表示されるはずです。

== APP == {"name":"Shin Tanimoto","twitter":"@cero_t"}

特にCloudEventsの仕様に準拠させる必要がない場合には、この方法を使っても良いでしょう。

ここまででRedisを介したメッセージングを行うことができました。キュー処理もデキュー処理も、RedisのAPISDKを利用するのではなく、HTTP処理だけで書けることが分かりました。

f:id:cero-t:20211205084027p:plain
Daprを経由したメッセージング

異なるメッセージブローカーを使う

ここまではRedisを用いたメッセージングを行いましたが、他のメッセージブローカーを使うこともできます。

サポートされているメッセージブローカーの一覧はこちらにあります。

docs.dapr.io

ここではRabbitMQを使ってみることにします。まだRabbitMQ対応のコンポーネントはAlpha版のようですが、Dapr v1.5でデッドレターキュー(DLX)にも対応され、実際に運用しても特に問題は起きていません。

RabbitMQの起動

まずはRabbitMQを起動しましょう。Dockerを使って起動します。

docker run -d --name dapr_rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

キューは5672番ポート、管理コンソールは15672番ポートで待ち受けるようにしました。

アプリケーションの修正

Dapr設定ファイルの作成

続いて、このRabbitMQに接続するための設定ファイルをpublishモジュール、subscribeモジュールのcomponentsディレクトリにそれぞれ作成します。

components/rabbitmq-pubsub.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbitmq-pubsub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: amqp://localhost:5672

pubsubname を分かりやすいよう rabbitmq-pubsub にしています。

host にはRabbitMQへの接続アドレスを指定します。

publish側のアプリケーション設定ファイルを追加

pusbsub名を変更したためPub/sub APIのパスも修正が必要です。URLの設定ファイルを追加で作成します。

(publish) application-rabbitmq.properties

pubsubUrl=http://localhost:${DAPR_HTTP_PORT}/v1.0/publish/rabbitmq-pubsub/my-message

メッセージトピックに my-message を指定しました。

subscribe側にDar設定ファイルの追加

さらに、rabbitmq-pubsub からデキューするようsubscribe側にSubscriptionの設定ファイルを作成します。

subscribe/components/subscription.yaml

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: subscription
spec:
  pubsubname: rabbitmq-pubsub
  topic: my-message
  route: /subscribe
scopes:
  - subscribe-app

先に作成した設定ファイルから pubsubname の部分を rabbitmq-pubsub に変えただけです。

設定を有効にしてアプリケーションを起動

publish側の起動

それでは作成した設定ファイルが有効になるようDaprを使ってアプリケーションを起動します。まずはpublish側から起動します。

いったん停止させた後、次のコマンドで起動します。

dapr run --app-id publish-app --app-port 8083 --components-path ./components -- ../mvnw spring-boot:run -Dspring-boot.run.profiles=rabbitmq

components/rabbitmq-pubsub.yaml を有効にするために --components-path ./components を追加したのと、application-rabbitmq.properties が有効になるよう -Dspring-boot.run.profiles=rabbitmq を追加しました。

この時点でブラウザからRabbitMQの管理コンソールにアクセスしてみましょう。

http://localhost:15672/

初期アカウントは Username Password ともに guest です。

「Exchanges」タブでRabbitMQが管理しているExchangeの一覧を参照することができます。

f:id:cero-t:20211205082123j:plain
publish起動後のExchange一覧

この時点では、まだDaprに関するExchangeなどは特に作成されていませんね。

subscribe側の起動

続いて、subscribe側のアプリケーションを設定ファイルを有効にした状態で起動します。

dapr run --app-id subscribe-app --app-port 8084 --components-path ./components ../mvnw spring-boot:run

上と同じように --components-path ./components を追加しました。

もう一度、RabbitMQの管理コンソールにアクセスして、「Exchanges」タブを開いてみます。

f:id:cero-t:20211205082355j:plain
subscribe起動後のExchange一覧

Exchangeに my-message が作成されています。このExchangeをクリックして詳細を開いてみます。

f:id:cero-t:20211205082712j:plain
my-messageの詳細

このExchangeから subscribe-app-my-message というQueueにバインドされた状態になっています。RabbitMQに不慣れな方に説明すると、Exchangeはキューのルーティングを行うようなもので、送り先のキューを選択したり、複数のキューに同じメッセージを送ったりすることができます。Queueは文字通り単一のキューです。 AWSに詳しければ、ExchangeはSNS、QueueがSQSのような役割を果たすものだと考えてください。

メッセージを送る

起動を確認できたら、次のコマンドでメッセージを送ってみます。

curl -XPOST "localhost:8083/publish" -H "Content-type:application/json" -d '{
  "name": "Shin Tanimoto",
  "twitter": "@cero_t"
}'

subscribe側のコンソールに次のようなメッセージが表示されたはずです。

== APP == {"specversion":"1.0","source":"publish-app","type":"com.dapr.event.sent","topic":"my-message","id":"e9f066d7-d129-430e-b069-4bf01a50fb2d","datacontenttype":"application/json","pubsubname":"rabbitmq-pubsub","traceid":"00-ee177cd7c4aaaf83c591c04e34e2edb2-49e157665d77f0e3-01","data":{"name":"Shin Tanimoto","twitter":"@cero_t"}}

pubsubnamerabbitmq-pubsub に変わっていることが分かります。CloudEventsを無効にしているとこれらのパラメータが表示されないので、確認のため有効にしておくと良いでしょう。

ソースコードを変えることなく、設定を変えるだけで接続先をRedisからRabbitMQに変えることができました。

まとめ

  • Pub/sub APIを用いてメッセージブローカーに対してHTTPでメッセージを送ることができます
  • メッセージブローカーからメッセージを受け取る側は、Web APIと設定ファイルだけで作成ができます
  • つまりHTTPの送受信だけでメッセージングができます
  • 設定を変えるだけで別のメッセージブローカーに切り替えることができます

なお、長くなるので本文中では触れませんでしたが、publish側のURLを http://localhost:8084/subscribe に変更すれば、直接subscribe側のAPIを呼び出すこともできます。開発時にはDaprやメッセージブローカーを利用せずに同期処理としてビジネスロジックが正しく動くことを検証し、テスト時にはDaprやメッセージブローカーを持ちいて非同期通信で検証する、ということも可能になります。

特にキューを用いたアプリケーションをJUnitなどでテストする際には、非同期処理側が終わることを少し待ってからassertしなければなりませんが、テスト時には同期処理にしてしまえば、すぐに検証ができて楽ですよね。

今回はメッセージブローカーの独自APISDKを使わず、Daprを用いて通信をHTTP(gRPC)に寄せることで、設定だけで柔軟に環境を変えられるということが分かりました。僕がDaprを好きな理由の一つです。

それでは、また来週(明日)!