
Kubernetes
イベント
該当するコンテンツが見つかりませんでした
マガジン
技術ブログ
AI事業本部 アドテクカンパニー Dynalystに所属している平田聡一朗と申します。本記事ではSt ...
こんにちは、サイオステクノロジー武井です。今回は、イケてるOSSであるDaprについて一筆したためました。 Daprとは? Daprとは、Distributed Application Runtimeの略であり、本当にざっくり一言で言えば、分散アプリケーションサービスを開発する際のインフラレイヤーを抽象化するためのものです。 ここは説明が非常に難しいので、この章では、ざっくりとした理解で構いません。順を追ってDaprの本質に迫っていきたいと思います。 ここでDaprを理解するために一つの例を上げてみましょう。 例えば、とあるアプリケーションで、ユーザーの情報を保存するために、データベースを使用しているとします。当然従来のアプリケーションでは、当然MySQLにはMySQL用の実装を、PostgreSQLにはPostgreSQL用の実装をする必要があります。これをDaprを使用することで、データベースの種類に関係なく、同じコードでデータベースにアクセスすることができます。つまり、Daprは、データベースの種類を抽象化してくれるのです。 他にも、Daprは、Publish and Subscribe、State Management、Bindingsなど分散アプリケーションを開発する際に必要な機能を提供しているのですが、これを実現するアーキテクチャとして最も重要なのが、Daprのサイドカーです。 このサイドカーの仕組みをわかりやすく伝えるために、データベースにアクセスする際の例を用いて、「Daprを使わない場合」と「Daprを使う場合」の違いを見てみましょう。 Daprを使わない場合 従来のアプリケーションでは、データベースにアクセスするためのコードは、アプリケーションの中に直接書かれています。例えば、MySQLにアクセスするためのコードは、MySQL用のライブラリを使用して書かれています。その構成は以下の通りとなります。 例えば、ここでMySQLをPostgreSQLに変更したい場合、MySQL用のコードを削除して、PostgreSQL用のコードを書き直す必要があります。書き直した後の構成は以下の通りとなります。 先ほどとは違い、アプリ内の「MySQL用のコード」が「PostgreSQL用のコード」に変わっていますね。このように、Daprを使わない場合、データベースの種類を変更するたびに、アプリケーションのコードを変更する必要があります。これが、Daprを使う場合と比べて、非常に面倒であることがわかります。 Daprを使う場合 Daprを使う場合、アプリケーションのコードは、DaprのAPIを使用してデータベースにアクセスします。Daprは、データベースの種類を抽象化してくれるため、アプリケーションのコードは、データベースの種類に関係なく同じコードでアクセスすることができます。 MySQLにアクセスするためのDaprを使う場合の構成は以下の通りとなります。 「Daprを使わない場合」と大きく異なるのは、サイドカーが存在していることです。アプリケーションは、DaprのAPIを使用してサイドカーにアクセスし、サイドカーがデータベースにアクセスするという構成になっています。この構成のメリットは、データベースの種類を変更する際に、アプリケーションのコードを変更する必要がないことです。例えば、MySQLからPostgreSQLに変更したい場合、サイドカーの設定を変更するだけで、アプリケーションのコードはそのままで済みます。変更後の構成は以下の通りとなります。 アプリ側は全く変更してないのがわかりますでしょうか?変更したのは、サイドカーの設定だけです。この設定では、サイドカーにてPostgreSQLにアクセスするように変更しています。 このように、Daprを使う場合、データベースの種類を変更する際に、アプリケーションのコードを変更する必要がないため、非常に便利であることがわかります。 一方で、このサイドカーが対応していないデータベースを使用したい場合は、サイドカーの設定を変更するだけでは対応できないため、アプリケーションのコードを変更する必要があります。 この例では、データベースへのアクセスを例に挙げましたが、Daprは、Publish and Subscribe、State Management、Bindingsなど、分散アプリケーションを開発する際に必要な機能を提供しているため、これらの機能も同様に抽象化されていることがわかります。つまり、Daprを使うことで、分散アプリケーションの開発が非常に楽になるということです。 サイドカーの実行形態について Daprのサイドカーの実行形態は大きく分けると、以下の2つがあります。 サイドカーコンテナ サイドカープロセス サイドカーコンテナ サイドカーコンテナは、Daprのサイドカーがコンテナとして実行される形態です。これは、Kubernetesなどのコンテナオーケストレーションツールを使用している場合に一般的に使用されます。Daprのサイドカーは、アプリケーションと同じPod内のコンテナとして実行されます。アプリケーションとDaprはlocalhostで通信するため、低レイテンシで連携できます。図中の「Scheduler」「Placement」については、Daprのコントロールプレーンコンポーネントであり、サイドカーコンテナとは別に実行されます。本記事では、サイドカーコンテナの説明に焦点を当てるため、これらのコントロールプレーンコンポーネントに関する説明は省略しています。 サイドカープロセス Daprのサイドカーは、VMやローカル環境ではアプリケーションと同じホスト上の別プロセスとして実行されます。アプリケーションは、HTTPまたはgRPCを通じてローカルのdaprdプロセスに接続します。この形態は、Kubernetes以外の環境でDaprを利用する場合によく使われる実行方法です。 以下の図は、開発環境でDaprを利用する際のサイドカープロセスの構成例です。dapr CLIというDapr専用のコマンドラインツールを使用すると、アプリケーションとサイドカープロセスを同時に起動できます。 RedisとZipkinは、Daprのサイドカーが利用する外部コンポーネントの例です。Redisは、DaprのState ManagementやPublish and Subscribeなどの機能でよく利用されるデータストアです。またZipkinは、Daprの分散トレーシング機能で使用されるトレーシングシステムです。これらのコンポーネントは、dapr CLIによって起動されます。 本記事で紹介するサンプルアプリケーションは、ここで説明したように、dapr CLIによって起動されるサイドカープロセスの構成で動作することを前提としています。ただし、実行形態が異なる場合でも基本的な考え方は同じであり、他の実行形態にも応用できます。そのため、本記事ではサイドカープロセスの形態を前提として説明していきます。 Daprを使うための環境準備 先程説明しましたように、Daprを使うための環境は様々なです。例えば、Kubernetes環境でDaprを利用する場合は、Kubernetesクラスターを用意し、Daprをインストールする必要があります。一方で、ローカル環境でDaprを利用する場合は、dapr CLIをインストールするだけで済みます。 本記事の中で説明するサンプルアプリケーションは、ローカル環境でDaprを利用することを前提としています。そのため、dapr CLIをインストールする必要があります。dapr CLIとは、Daprをローカル環境で利用するためのコマンドラインツールです。dapr CLIを使用することで、Daprのサイドカープロセスを簡単に起動したり、Daprのコンポーネントを管理したりすることができます。 そのインストール手順は以下のURLに記載されていますので、それを見てさっくりとインストールしてみてください。 https://docs.dapr.io/getting-started/install-dapr-cli/ そのインストール方法はめっちゃ簡単です。一例としてLinuxにDapr CLIをインストールする手順は以下の通りです。 $ wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash そして、次に、初期化が必要になります。これは、ローカル環境でDaprを使うための様々なコンポーネントを起動するためのコマンドです。初期化のコマンドは以下の通りです。 $ dapr init このコマンドにより、Daprのサイドカープロセスであるdaprdがインストールされ、そしてDaprを動作させるために必要な基本コンポーネントであるRedisやZipkinなども起動されます。試しに以下のコマンドを実行してみてください。 $ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 6822763b7ba6 redis:6 "docker-entrypoint.s…" 2 weeks ago Up 13 days 0.0 .0.0:6379- > 6379 /tcp, [ :: ] :6379- > 6379 /tcp dapr_redis d5bf3b79e44d daprio/dapr:1.17.0 "./scheduler --etcd-…" 2 weeks ago Up 13 days 0.0 .0.0:2379- > 2379 /tcp, [ :: ] :2379- > 2379 /tcp, 0.0 .0.0:50006- > 50006 /tcp, [ :: ] :50006- > 50006 /tcp, 0.0 .0.0:58081- > 8080 /tcp, [ :: ] :58081- > 8080 /tcp, 0.0 .0.0:59091- > 9090 /tcp, [ :: ] :59091- > 9090 /tcp dapr_scheduler c79f84b94e2f daprio/dapr:1.17.0 "./placement" 2 weeks ago Up 13 days 0.0 .0.0:50005- > 50005 /tcp, [ :: ] :50005- > 50005 /tcp, 0.0 .0.0:58080- > 8080 /tcp, [ :: ] :58080- > 8080 /tcp, 0.0 .0.0:59090- > 9090 /tcp, [ :: ] :59090- > 9090 /tcp dapr_placement dbf0fab9540b openzipkin/zipkin "start-zipkin" 2 weeks ago Up 13 days ( healthy ) 0.0 .0.0:9411- > 9411 /tcp, [ :: ] :9411- > 9411 /tcp dapr_zipkin 様々なコンテナが起動していることがわかりますね。これらのコンテナは、DaprのコントロールプレーンコンポーネントであるSchedulerやPlacement、そしてDaprの分散トレーシング機能で使用されるZipkinなどです。それぞれのコンテナの説明は割愛します。ここでは、 これで、ローカル環境でDaprを利用するための環境が整いました。 Daprの様々なコンポーネント Daprにはさまざまなコンポーネントが用意されています。これらのコンポーネントは、Daprの機能を実現するためのものであり、アプリケーションはDaprを介してそれらを利用できます。 例えば、Daprには状態を保存するための State Management コンポーネントがあります。このコンポーネントを利用するには、設定ファイルで対象のコンポーネントを定義します。すると、Daprサイドカー(daprd)がそのコンポーネントを読み込み、対応するインフラへ接続できるようになります。 その結果、アプリケーションがサイドカーに対して状態を保存するAPIを呼び出すと、サイドカーはState Managementコンポーネントを通じて、RedisやAzure Cosmos DBなどのバックエンドストレージに状態を保存できるようになります。 本章では、Daprの代表的なコンポーネントであるState Management、Publish and Subscribe、Service Invocation、Bindingsについて、簡単に説明していきます。これらのコンポーネントは、Daprを使用する際に非常に重要な役割を果たすため、理解しておくことが重要です。 そして、以降では、これらのコンポーネントを実際に利用するためのサンプルコードも紹介しながら、Daprの機能を説明していきます。サンプルコードはGitHubのリポジトリに公開していますので、ぜひ参考にしてみてください。 https://github.com/noriyukitakei/dapr-sample State Management まずは、State Managementコンポーネントについて説明します。State Managementコンポーネントは、Key/Value形式の状態を保存するためのコンポーネントです。Daprは、State Managementコンポーネントを通じて、RedisやAzure Cosmos DBなどのバックエンドストレージに状態を保存できるようになります。 Key/Value形式のデータの代表例で言えば、ユーザーのセッション情報や、IoTデバイスの状態(例: 温度センサーの最新の温度値)などが挙げられます。これらのデータは、アプリケーションの状態を管理するために頻繁に使用されます。 システム構成 State Managementコンポーネントを利用する際のシステム構成は以下の通りとなります。 サイドカーは、Redis用の設定ファイルを読み込むことで、Redisに接続できるようになります。アプリケーションは、DaprのAPIを使用してサイドカーに状態を保存するリクエストを送ります。サイドカーは、そのリクエストを受け取ると、State Managementコンポーネントを通じて、Redisに状態を保存します。 ファイル構成 State Managementコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ( https://github.com/noriyukitakei/dapr-sample )の中の、State Managementディレクトリの中に配置されています。 StateManagement ├── app.py ├── Infrastructure │ └── components │ ├── redis │ │ └── statestore_redis.yaml │ └── sqlite │ └── statestore_sqlite.yaml ├── README.md └── requirements.txt ではこれらのファイルを一つずつ紐解くことで、State Managementコンポーネントを利用するための構成を理解していきましょう。 ソースコードの説明 ■ Infrastructure/components/redis/statestore_redis.yaml Infrastructure/components/redis/statestore_redis.yamlは、DaprのState ManagementコンポーネントをRedisに接続するための設定ファイルです。このファイルには、DaprがRedisに接続するための情報が記載されています。 apiVersion : dapr.io/v1alpha1 # Daprのリソース定義のバージョン kind : Component # 設定ファイルの種別 metadata : name : statestore # アプリから参照するコンポーネント名 spec : type : state.redis # コンポーネントの種別 version : v1 # コンポーネントのバージョン metadata : - name : redisHost value : localhost : 6379 # Redisの接続先 - name : redisPassword value : "" # パスワード(未設定) このファイルは一言で言うと、「DaprのState ManagementでRedisを使う」ことを定義しているものです。アプリケーションはRedisを直接操作するのではなく、DaprのAPIを通じて状態を保存します。 例えばアプリケーションから次のように呼び出すと、 client . save_state ( "statestore" , "user1" , data ) Daprは内部的に先程のコンポーネント定義を参照して、Redisに接続し、状態を保存します。つまり以下のような流れです。 アプリケーションがDaprのAPIを呼び出す。そのとき、コンポーネント名として「statestore」を指定する。 Daprがコンポーネント定義の中から、metadata.nameが「statestore」であるコンポーネント定義を探す。 Daprがコンポーネント定義を参照して、Redisに接続するための情報(ここでは接続先がlocalhost:6379、パスワードが空)を取得する。 Daprが取得した情報を基に、Redisに接続する。 Daprがアプリケーションから受け取った状態をRedisに保存する。 Daprが保存の結果をアプリケーションに返す。 ■ app.py app.pyは、DaprのState Managementコンポーネントを利用するためのアプリケーションコードです。このコードは、DaprのPython SDKを使用して、状態を保存するためのAPIを呼び出しています。 では、コードの内容を見てみましょう。 for i in range ( 1 , 10 ) : json_data = [ { "key" : str ( i ) , "value" : { "orderId" : i } } ] requests . post ( "http://localhost:3611/v1.0/state/statestore" , json = json_data ) time . sleep ( 1 ) このコードは、1から9までの数字をキーとし、その値としてorderIdを持つJSONオブジェクト(以下参照)を作成し、DaprのState Management APIにPOSTリクエストを送っています。 [ { "key" : "1" , "value" : { "orderId" : 1 } } , { "key" : "2" , "value" : { "orderId" : 2 } } , ⋯以下略⋯ ] リクエストのURLには、先程のコンポーネント定義で指定した「statestore」が含まれています。これにより、Daprは「statestore」という名前のState Managementコンポーネントを参照して、Redisに状態を保存します。 リクエストのURLの構成は以下の通りです。 http://localhost:3611: Daprサイドカーのエンドポイント ※ このポート番号については、Dapr CLIを使用してサイドカープロセスを起動する際に指定したポート番号になります。後ほど説明しますが、Dapr CLIを使用してサイドカープロセスを起動する際に、–app-portオプションでアプリケーションのポート番号を指定することができます。例えば、アプリケーションのポート番号を3611に指定した場合、Daprサイドカーはlocalhost:3611で待ち受けるようになります。 /v1.0: Dapr APIのバージョン /state: State Management APIを呼び出すことを示すパス /statestore: 先程のコンポーネント定義で指定した「statestore」という名前のState Managementコンポーネントを参照するためのパス result = requests . get ( f"http://localhost:3611/v1.0/state/statestore/ { i } " ) print ( result . json ( ) ) time . sleep ( 1 ) このコードは、先程保存した状態をDaprのState Management APIにGETリクエストを送って取得しています。リクエストのURLには、先程のコンポーネント定義で指定した「statestore」が含まれていることに加えて、最後に/{i}が追加されています。これにより、Daprは「statestore」という名前のState Managementコンポーネントを参照して、Redisからキーが{i}である状態を取得します。リクエストのURLの構成は以下の通りです。 http://localhost:3611: Daprサイドカーのエンドポイント /v1.0: Dapr APIのバージョン /state: State Management APIを呼び出すことを示すパス /statestore: 先程のコンポーネント定義で指定した「statestore」という名前のState Managementコンポーネントを参照するためのパス /{i}: 取得したい状態のキーを指定するためのパス requests . delete ( f"http://localhost:3611/v1.0/state/statestore/ { i } " ) time . sleep ( 1 ) このコードは、先程保存した状態をDaprのState Management APIにDELETEリクエストを送って削除しています。リクエストのURLの構成は、先程のGETリクエストと同様であり、最後に/{i}が追加されていることがわかります。これにより、Daprは「statestore」という名前のState Managementコンポーネントを参照して、Redisからキーが{i}である状態を削除します。リクエストのURLの構成は以下の通りです。 http://localhost:3611: Daprサイドカーのエンドポイント /v1.0: Dapr APIのバージョン /state: State Management APIを呼び出すことを示すパス /statestore: 先程のコンポーネント定義で指定した「statestore」という名前のState Managementコンポーネントを参照するためのパス /{i}: 削除したい状態のキーを指定するためのパス 実行方法 では次に、State Managementコンポーネントを利用するためのアプリケーションコードを実行してみましょう。実行する前に、Dapr CLIを使用してサイドカープロセスを起動する必要があります。ここではその手順を説明します。 $ git clone https://github.com/noriyukitakei/dapr-sample $ cd dapr-sample/StateManegement まずは、先程紹介したGitHubリポジトリからサンプルコードをクローンして、StateManagementディレクトリに移動します。 $ pip install -r requirements.txt 次に、Pythonの依存関係をインストールします。requirements.txtには、このアプリケーションコードを実行するために必要なPythonパッケージが記載されています。 ちなみにRedisについては、dapr initコマンドを実行した際に、DaprのState ManagementやPublish and Subscribeなどの機能でよく利用されるデータストアであるRedisも起動されます。ですので、特にRedisを起動するためのコマンドを実行する必要はありません。 $ dapr run --app-id statemanagement --dapr-http-port 3611 --components-path Infrastructure/components/redis -- python app.py dapr runコマンドを使用して、アプリケーションコードを実行します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「statemanagement」というIDを指定しています。このIDは、Daprサイドカーがアプリケーションを識別するために使用されます。State Managementでは利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。 –dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3611を指定しています。これにより、Daprサイドカーはlocalhost:3611で待ち受けるようになります。 –components-pathオプションで、Daprのコンポーネント定義ファイルが配置されているディレクトリを指定します。ここではInfrastructure/components/redisを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。 図解すると以下のような対応関係になります。 このコマンドを実行すると、Daprサイドカーが起動し、そしてアプリケーションコードも実行されます。アプリケーションコードは、DaprのState Management APIを呼び出して、状態を保存、取得、削除するリクエストを送ります。Daprサイドカーは、そのリクエストを受け取ると、State Managementコンポーネントを通じて、Redisに状態を保存、取得、削除します。 保存先をSQLiteに変更してみる では、Daprのメリットを体感して頂くために、状態の保存先をRedisからSQLiteに変更してみましょう。Daprを使わない場合は、アプリケーションコードを変更する必要がありますが、Daprを使う場合は、コンポーネント定義ファイルを変更するだけで済みます。 つまりこんな感じです。 変更後のコンポーネント定義ファイルは以下の通りとなります。 apiVersion : dapr.io/v1alpha1 kind : Component metadata : name : statestore spec : type : state.sqlite version : v1 metadata : - name : connectionString value : "sqlite/data.db" まず、Redis用のコンポーネント定義ファイルと比較すると、typeがstate.sqliteに変わっています。これにより、DaprはState ManagementコンポーネントとしてSQLiteを使用するようになります。そして、spec.metadataの内容も変わっています。SQLiteに接続するための情報を記載する必要があるため、connectionStringという名前のメタデータを追加しています。これにより、DaprはSQLiteに接続するための情報を取得できるようになります。 このファイルは、Infrastructure/components/sqlite/statestore_sqlite.yamlに配置されています。 先程のdapr runコマンドの–components-pathオプションで、このファイルが配置されているディレクトリを指定することで、Daprサイドカーはこのファイルを読み込むようになります。 $ dapr run --app-id statemanagement --dapr-http-port 3611 --components-path Infrastructure/components/sqlite -- python app.py いかがでしょうか?アプリケーションコードを一切変更することなく、状態の保存先をRedisからSQLiteに変更することができましたね。これがDaprのメリットの一つである、インフラレイヤーの抽象化による柔軟性の高さです。 Publish and Subscribe 次は、Publish and Subscribeコンポーネントについて説明します。DaprのPublish and Subscribeコンポーネントについて説明する前に、まずはPublish and Subscribeの概念について簡単に説明します。 例えば、あるアプリケーションで、ユーザーが新しい注文を作成したとします。そのとき、注文が作成されたことを他のサービスに通知したい場合があります。例えば、在庫管理サービスや配送サービスなどです。このような場合に、Publish and Subscribeの仕組みが役立ちます。 Publish and Subscribeを使わない場合で考えてみましょう。ユーザーが新しい注文を作成したとき、注文サービスは在庫管理サービスや配送サービスに対して、HTTPリクエストを送って通知します。注文サービスは、配送サービス、在庫管理サービスの両方から正常にレスポンスが返ってきたら、注文が正常に処理されたと判断します。 もし、ここで配送サービスがダウンしている場合、注文サービスは配送サービスに通知することができません。そのため、注文サービスは失敗して、ユーザーは注文を作成することができません。 そこでPublish and Subscribeの仕組みを使うと、注文サービスは、注文が作成されたことを「メッセージブローカー」に対して通知します。そして、配送サービスや在庫管理サービスは、そのメッセージブローカーから注文が作成されたことを受け取ります。これにより、注文サービスは配送サービスや在庫管理サービスの状態に関係なく、注文が作成されたことを通知することができます。そして、配送サービスが仮にダウンしていたとしても、注文サービスは注文が作成されたことをメッセージブローカーに通知することができるため、ユーザーは注文を作成することができ、かつ在庫管理サービスは注文が作成されたことを受け取ることができます。配送サービスが復旧したときに、配送サービスも注文が作成されたことを受け取ることができます。 このように、Publish and Subscribeの仕組みを使うことで、サービス間の疎結合な連携が可能になります。DaprのPublish and Subscribeコンポーネントは、このようなPublish and Subscribeの仕組みを提供するためのコンポーネントです。Daprを使用することで、アプリケーションは、DaprのAPIを通じて、メッセージブローカーに対してメッセージを公開したり、メッセージブローカーからメッセージを受け取ったりすることができます。 システム構成 DaprでPublish and Subscribeコンポーネントを利用する際のシステム構成は以下の通りとなります。メッセージブローカーとしてRedisを使用する場合の構成例を示しています。 ① まず、Publisherが、DaprのHTTP/gRPC APIを通じて、サイドカーに対してメッセージを公開するリクエストを送ります。 ② サイドカーは、そのリクエストを受け取ると、Publish and Subscribeコンポーネントを通じて、Redisなどのメッセージブローカーに対して、メッセージを送信します。 ③ サイドカーは、Redisに接続し、Redisからメッセージを受け取ります。 ④ サイドカーは、そのリクエストを受け取ると、事前にコンポーネントで指定したSubscriberのエンドポイントに対して、メッセージを送信します。 もちろん、PublisherとSubscriberは、メッセージを格納するためのデータストアに何を使っているのかを知る必要はありません。RedisだろうがRabbitMQだろうが、Publisherは、DaprのAPIを通じてサイドカーに対してメッセージを公開するだけで済みますし、Subscriberは、DaprのAPIを通じてサイドカーからメッセージを受け取るだけで済みます。これが、DaprのPublish and Subscribeコンポーネントのメリットの一つである、インフラレイヤーの抽象化による柔軟性の高さです。 ファイル構成 Publish and Subscribeコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ( https://github.com/noriyukitakei/dapr-sample )の中の、PubSubディレクトリの中に配置されています。 PubSub ├── Infrastructure │ └── components │ └── pubsub.yaml ├── publisher │ ├── app.py │ └── requirements.txt └── subscriber ├── app.py └── requirements.txt ソースコードの説明 ■ Infrastructure/components/pubsub.yaml Infrastructure/components/pubsub.yamlは、DaprのPublish and SubscribeコンポーネントをRedisに接続するための設定ファイルです。このファイルには、DaprがRedisに接続するための情報が記載されています。 apiVersion : dapr.io/v1alpha1 # Daprのリソース定義のバージョン kind : Component # 設定ファイルの種別 metadata : name : orderpubsub # アプリから参照するコンポーネント名 spec : type : pubsub.redis # コンポーネントの種別 version : v1 # コンポーネントのバージョン metadata : - name : redisHost # Redisのホスト名とポート value : localhost : 6379 - name : redisPassword # Redisのパスワード value : "" StateManagementと同じところは説明を省略します。ここで新たに説明する必要があるのは、spec.typeがpubsub.redisになっていることです。これにより、DaprはPublish and SubscribeコンポーネントとしてRedisを使用するようになります。 順番が前後しますが、metadata.nameが「orderpubsub」であることも重要です。これにより、アプリケーションは「orderpubsub」という名前のPublish and Subscribeコンポーネントを参照して、メッセージを公開したり、メッセージを受け取ったりすることができます。 ■ publisher/app.py publisher/app.pyは、DaprのPublish and Subscribeコンポーネントを利用してメッセージを公開するためのアプリケーションコードです。このコードは、 トピックにメッセージを発行する for i in range ( 1 , 10 ) : order = { "orderId" : i } requests . post ( "http://localhost:3613/v1.0/publish/orderpubsub/orders" , json = order ) logging . info ( "送信データ: " + json . dumps ( order ) ) time . sleep ( 1 ) このコードは、1から9までの数字をorderIdとするJSONオブジェクト(以下参照)を作成し、DaprのPublish and Subscribe APIにPOSTリクエストを送っています。 { "orderId" : 1 } リクエストのURLには、先程のコンポーネント定義で指定した「orderpubsub」が含まれています。これにより、Daprは「orderpubsub」という名前のPublish and Subscribeコンポーネントを参照して、Redisにメッセージを公開します。リクエストのURLの構成は以下の通りです。 http://localhost:3613: Daprサイドカーのエンドポイント /v1.0: Dapr APIのバージョン /publish: Publish and Subscribe APIを呼び出すことを示すパス /orderpubsub: 先程のコンポーネント定義で指定した「orderpubsub」という名前のPublish and Subscribeコンポーネントを参照するためのパス /orders: メッセージのトピックを指定するためのパス トピックとは、メッセージを分類するための名前のことです。Publisherは、メッセージを公開するときに、どのトピックに公開するかを指定します。そして、Subscriberは、どのトピックからメッセージを受け取るかを指定します。これにより、PublisherとSubscriberは、特定のトピックに対してメッセージを公開したり、受け取ったりすることができます。 ■ subscriber/app.py subscriber/app.pyは、DaprのPublish and Subscribeコンポーネントを利用してメッセージを受け取るためのアプリケーションコードです。 ソースコードの説明に入る前に、Subscriberがメッセージを受信するまでの流れを説明します。 まず、Subscriberのサイドカーは、/dapr/subscribeエンドポイントを通じて、Subscriberがどのトピックからメッセージを受け取るかをDaprに通知します。 Subscriberのサイドカーは、Redisからメッセージを受け取ると、Subscriberのエンドポイントである「/トピック名」(この例では/orders)に対して、メッセージを送信します。 では、上記を踏まえてソースコードの主要な部分を説明します。 from flask import Flask , request , jsonify import json app = Flask ( __name__ ) 最初にFlaskをインポートして、Flaskアプリケーションのインスタンスを作成しています。Flaskは、PythonでWebアプリケーションを作成するためのフレームワークです。 先ほど説明したように、Subscriberのサイドカーは、Subscriberの特定のHTTPエンドポイントに対してアクセスしてくるので、SubscriberはHTTPサーバーを立てる必要があります。Flaskは、そのHTTPサーバーを簡単に立てることができるため、ここではFlaskを使用しています。Flask以外のHTTPサーバーフレームワークを使用しても問題ありません。 @app . route ( "/dapr/subscribe" , methods = [ "GET" ] ) def subscribe ( ) : subscriptions = [ { "pubsubname" : "orderpubsub" , "topic" : "orders" , "route" : "orders" } ] return jsonify ( subscriptions ) このコードは、Subscriberのサイドカーが/dapr/subscribeエンドポイントにアクセスしたときに呼び出される関数を定義しています。この関数は、Subscriberがどのトピックからメッセージを受け取るかをDaprに通知するためのものです。 つまり、この関数は、Daprに対して、Subscriberが「orderpubsub」という名前のPublish and Subscribeコンポーネントの「orders」というトピックからメッセージを受け取ることを通知しています。そして、Daprは、Subscriberのサイドカーが「orders」というエンドポイントに対して、メッセージを送信するようになります。 @app . route ( "/orders" , methods = [ "POST" ] ) def orders_subscriber ( ) : event_orderid = request . json [ "data" ] [ "orderId" ] print ( "受信データ: " + json . dumps ( event_orderid ) , flush = True ) return json . dumps ( { "success" : True } ) , 200 , { "ContentType" : "application/json" } このコードは、Subscriber側のサイドカーから「/orders」エンドポイントにリクエストが送られたときに実行される関数を定義しています。 このエンドポイントは、/dapr/subscribe エンドポイントでDaprに登録した「orders」トピックに対応しており、そのトピックにメッセージが発行されると、Daprのサイドカーによってこの関数が呼び出されます。 つまり、この関数は「orders」トピックに発行されたメッセージを受信し、処理するためのエンドポイントとして動作します。 この関数の中では、リクエストのJSONボディからorderIdを抽出し、それをコンソールに出力しています。そして、最後に、HTTPレスポンスとして、成功を示すJSONオブジェクトを返しています。 app.run(port=6104) このコードは、Flaskアプリケーションをポート6104で起動しています。これにより、Subscriberはポート番号6104でHTTPリクエストを受け付けるようになります。Publisherがメッセージを公開すると、Daprのサイドカーはこのポートに対してリクエストを送るため、Subscriberはこのポートでリクエストを受け取る必要があります。 実行方法 では次に、Publish and Subscribeコンポーネントを利用するためのアプリケーションコードを実行してみましょう。 まずは必要なライブラリをインストールします。 $ pip install -r publisher/requirements.txt $ pip install -r subscriber/requirements.txt 次に、Dapr CLIを使用してSubscriber及びSubscriberのサイドカープロセスを起動します。 $ dapr run --app-id pubsub --app-port 6104 --dapr-http-port 3614 --components-path Infrastructure/components -- python subscriber/app.py 上記のコマンドの詳細を説明します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「pubsub」というIDを指定しています。このIDは、Publish and Subscribeでは特に利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。 –app-portオプションでSubscriberのアプリケーションのポート番号を指定します。ここでは6104を指定しています。これにより、Subscriberはlocalhost:6104で待ち受けるようになります。 –dapr-http-portオプションでSubscriberのDaprサイドカーのHTTPポートを指定します。ここでは3614を指定しています。これにより、Daprサイドカーはlocalhost:3614で待ち受けるようになります。 –components-pathオプションで、Daprのコンポーネント定義ファイルが配置されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。 — python subscriber/app.pyは、SubscriberのDaprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、subscriber/app.pyを実行するように指定しています。つまり、Subscriberのアプリケーションを実行するように指定しています。 次に、Publisher及びPublisherのサイドカープロセスを起動します。 $ dapr run --app-id pubsub --dapr-http-port 3613 --components-path Infrastructure/components -- python publisher/app.py 上記のコマンドの詳細を説明します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「pubsub」というIDを指定しています。このIDは、Publish and Subscribeでは特に利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。 –dapr-http-portオプションでPublisherのDaprサイドカーのHTTPポートを指定します。ここでは3613を指定しています。これにより、Daprサイドカーはlocalhost:3613で待ち受けるようになります。 –components-pathオプションで、Daprのコンポーネント定義ファイルが配置されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。 — python publisher/app.pyは、PublisherのDaprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、publisher/app.pyを実行するように指定しています。つまり、Publisherのアプリケーションを実行するように指定しています。 このコマンドを実行すると、Publisherがメッセージを公開し、Subscriberがそのメッセージを受信する様子を確認することができます。Subscriberのコンソールには、Publisherが公開したメッセージが以下のように表示されるはずです。 == APP == 受信データ: 1 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 2 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 3 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 4 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 5 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 6 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 7 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 8 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - == APP == 受信データ: 9 == APP == 127.0 .0.1 - - [ 28 /Mar/2026 01:52:45 ] "POST /orders HTTP/1.1" 200 - Bindings Bindingsコンポーネントとは、アプリに入ってくるデータと、アプリから出ていくデータを処理するためのコンポーネントです。 システム構成 例えば、RabbitMQにデータが入ってきたときに、そのデータを処理して、処理した結果をPostgreSQLに保存したいとします。このような場合に、Bindingsコンポーネントが役立ちます。 前のコンポーネントと同じように、Bindingsコンポーネントも、アプリ側はRabbtiMQやPostgreSQLのことを知らなくても、DaprのAPIを通じて、RabbitMQからデータを受け取ったり、PostgreSQLにデータを保存したりすることができます。これも、Daprのメリットの一つである、インフラレイヤーの抽象化による柔軟性の高さです。 ① Publisher(キューにメッセージを登録する側)が、RabbitMQに対して、メッセージを登録します。 ② RabbitMQは、サイドカーにメッセージを登録します。 ③ サイドカーは、RabbitMQからメッセージを受け取ると、アプリが待ち受けている特定のHTTPエンドポイントに対して、メッセージを送信します。 ④ アプリは、そのHTTPエンドポイントでリクエストを受け取ると、処理を実行します。処理が完了したら、アプリは、DaprのAPIを通じて、サイドカーに対して、処理した結果を保存するリクエストを送ります。 ⑤ サイドカーは、コンポーネント定義ファイルを読み取り、PostgreSQLに対して、処理した結果を保存するリクエストを送ります。 サイドカーにデータが入っていくほうをInput Binding、サイドカーからデータが出ていくほうをOutput Bindingと呼びます。 Input Bindingは、ここで紹介したRabbitMQを始め、Azure Event HubsやAWS Kinesisなどのメッセージングサービス、HTTPやgRPCなどのプロトコル、ファイルシステムやFTPなどのストレージサービスなど、様々なものが用意されています。Output Bindingも、ここで紹介したPostgreSQLを始め、Azure Cosmos DBやAWS DynamoDBなどのデータベースサービス、HTTPやgRPCなどのプロトコル、ファイルシステムやFTPなどのストレージサービスなど、様々なものが用意されています。詳細は以下の公式ドキュメントを参照してください。 https://docs.dapr.io/reference/components-reference/supported-bindings/ ファイル構成 Bindingsコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ( https://github.com/noriyukitakei/dapr-sample )の中の、Bindingsディレクトリの中に配置されています。 Bindings ├── app.py ├── Infrastructure │ ├── components │ │ ├── binding-mq.yaml │ │ └── binding-sqldb.yaml │ └── db │ ├── docker-compose.yml │ ├── Dockerfile │ └── temperatures.sql ├── publish_temperatures.py └── requirements.txt ソースコードの説明 ■ Infrastructure/components/binding-mq.yaml apiVersion : dapr.io/v1alpha1 # Daprのリソース定義のバージョン kind : Component # 設定ファイルの種別 metadata : name : mq # アプリから参照するコンポーネント名 spec : type : bindings.rabbitmq # コンポーネントの種別 metadata : - name : host # RabbitMQのホスト名とポート value : "amqp://guest:guest@localhost:5672" - name : queueName # RabbitMQのキュー名 value : "dapr-queue" - name : direction # データの流れを指定するためのメタデータ value : "input" 他のコンポーネントと同じところは説明を省略します。ここで新たに説明する必要があるのは、spec.typeがbindings.rabbitmqになっていることです。これにより、DaprはBindingsコンポーネントとしてRabbitMQを使用するようになります。 順番が前後しますが、metadata.nameが「mq」であることも重要です。これにより、アプリケーションは「mq」という名前のBindingsコンポーネントを参照して、メッセージを受信したり、メッセージを送信したりすることができます。 spec.metadataは、RabbitMQに接続するための情報を記載しています。RabbitMQのホスト名とポートを指定するためのhost、RabbitMQのキュー名を指定するためのqueueName、データの流れを指定するためのdirectionという3つのメタデータを定義しています。directionは、データの流れを指定するためのメタデータであり、inputを指定すると、サイドカーはRabbitMQからメッセージを受信するためのBindingsコンポーネントとして動作します。 ■ Infrastructure/components/binding-sqldb.yaml apiVersion : dapr.io/v1alpha1 # Daprのリソース定義のバージョン kind : Component # 設定ファイルの種別 metadata : name : sqldb # アプリから参照するコンポーネント名 spec : type : bindings.postgres # コンポーネントの種別 version : v1 metadata : - name : url # PostgreSQLの接続情報 value : "user=postgres password=docker host=localhost port=5432 dbname=temperatures" - name : direction # データの流れを指定するためのメタデータ value : "output" このファイルは、DaprのBindingsコンポーネントをPostgreSQLに接続するための設定ファイルです。このファイルには、DaprがPostgreSQLに接続するための情報が記載されています。 spec.typeがbindings.postgresになっていることにより、DaprはBindingsコンポーネントとしてPostgreSQLを使用するようになります。 metadata.nameが「sqldb」であることも重要です。これにより、アプリケーションは「sqldb」という名前のBindingsコンポーネントを参照して、メッセージを受信したり、メッセージを送信したりすることができます。 spec.metadataは、PostgreSQLに接続するための情報を記載しています。PostgreSQLの接続情報を指定するためのurl、データの流れを指定するためのdirectionという2つのメタデータを定義しています。directionは、データの流れを指定するためのメタデータであり、outputを指定すると、サイドカーはPostgreSQLにメッセージを送信するためのBindingsコンポーネントとして動作します。 ■ db/docker-compose.yml このファイルは、PostgreSQLをDockerコンテナで起動するためのdocker-composeファイルです。このファイルを使用して、PostgreSQLを簡単に起動することができます。 ■ db/Dockerfile このファイルは、PostgreSQLのDockerイメージを作成するためのDockerfileです。このファイルを使用して、PostgreSQLのDockerイメージを作成することができます。 FROM postgres COPY temperatures.sql /docker-entrypoint-initdb.d/ このDockerfileは、公式のPostgreSQLイメージをベースにしています。そして、temperatures.sqlというSQLファイルを、PostgreSQLの初期化スクリプトが配置されるディレクトリである/docker-entrypoint-initdb.d/にコピーしています。これにより、PostgreSQLが起動するときに、このSQLファイルが実行されて、temperaturesテーブルが作成されます。 ■ db/temperatures.sql このファイルは、PostgreSQLの初期化スクリプトです。このファイルには、PostgreSQLが起動するときに実行されるSQL文が記載されています。ここでは、temperaturesテーブルを作成するSQL文が記載されています \c temperatures ; create table temperatures ( sensorid text , timestamp timestamptz , temperature float ) ; select * from temperatures ; このSQL文は、まずtemperaturesデータベースに接続するための\c temperatures;というコマンドを実行しています。そして、temperaturesテーブルを作成するためのcreate table文を実行しています。最後に、temperaturesテーブルの中身を確認するためのselect文を実行しています。 ■ publish_temperatures.py このファイルは、RabbitMQに対して、温度センサーのデータを送信するためのアプリケーションコードです。 def publish ( ) - > None : data = { "sensorid" : "sensor-1" , "timestamp" : "2026-03-07T10:00:00Z" , "temperature" : 22.5 , } まず、関数publishを定義しています。この関数は、RabbitMQに対して、温度センサーのデータを送信するためのものです。 このコードは、温度センサーのデータを表すJSONオブジェクトを作成しています。このJSONオブジェクトには、sensorid、timestamp、temperatureという3つのフィールドが含まれています。sensoridは、センサーのIDを表す文字列です。timestampは、センサーのデータが記録された日時を表す文字列です。temperatureは、センサーのデータである温度を表す数値です。 params = pika . URLParameters ( "amqp://guest:guest@localhost:5672" ) connection = pika . BlockingConnection ( params ) channel = connection . channel ( ) このコードは、pikaライブラリを使用して、RabbitMQに接続しています。pika.URLParametersを使用して、RabbitMQの接続情報を指定しています。そして、pika.BlockingConnectionを使用して、RabbitMQに接続しています。最後に、connection.channel()を使用して、RabbitMQのチャネルを作成しています。 channel . queue_declare ( queue = "dapr-queue" ) このコードは、RabbitMQのチャネルを使用して、dapr-queueという名前のキューを宣言しています。これにより、dapr-queueという名前のキューがRabbitMQに作成されます。 body = json . dumps ( data ) channel . basic_publish ( exchange = "" , routing_key = "dapr-queue" , body = body , properties = pika . BasicProperties ( delivery_mode = 2 ) , ) このコードは、RabbitMQのチャネルを使用して、dapr-queueという名前のキューに対して、温度センサーのデータを送信しています。exchangeには空文字列を指定することで、デフォルトのエクスチェンジを使用しています。routing_keyには、dapr-queueという名前のキューを指定しています。bodyには、温度センサーのデータをJSON形式で表した文字列を指定しています。そして、propertiesには、メッセージのプロパティを指定しています。ここでは、delivery_mode=2を指定することで、メッセージが永続化されるようにしています。 connection . close ( ) このコードは、RabbitMQへの接続を閉じています。 ■ app.py このファイルは、DaprのBindingsコンポーネントを利用して、RabbitMQからデータを受け取って、PostgreSQLにデータを保存するためのアプリケーションコードです。 app = Flask ( __name__ ) Triggered by Dapr input binding @app . route ( "/mq" , methods = [ "POST" ] ) def process_batch ( ) : このコードは、Flaskを使用して、HTTPサーバーを立てています。そして、/mqというエンドポイントに対してPOSTリクエストが送られたときに呼び出される関数process_batchを定義しています。このエンドポイントの/mqというパスは、先程のコンポーネント定義で指定した「mq」という名前のBindingsコンポーネントを参照するためのパスです。これにより、Daprは「mq」という名前のBindingsコンポーネントを参照して、RabbitMQからメッセージを受信すると、このエンドポイントに対してリクエストを送るようになります。 data = request . get_json ( silent = True ) sql_output ( data ) print ( "Finished processing batch" , flush = True ) return json . dumps ( { "success" : True } ) , 200 , { "ContentType" : "application/json" } 関数process_batchの中身です。まず、リクエストのJSONボディを取得しています。そして、sql_output関数を呼び出して、RabbitMQから受け取ったデータをPostgreSQLに保存しています。最後に、HTTPレスポンスとして、成功を示すJSONオブジェクトを返しています。 def sql_output ( reading ) : # expected keys: sensorid, timestamp, temperature sensorid = reading . get ( "sensorid" ) timestamp = reading . get ( "timestamp" ) temperature = reading . get ( "temperature" ) 先程の関数process_batchの中で呼び出されているsql_output関数です。この関数は、RabbitMQから受け取ったデータをPostgreSQLに保存するためのものです。まず、関数sql_outputを定義しています。この関数は、RabbitMQから受け取ったデータを表すJSONオブジェクトを引数として受け取ります。そして、そのJSONオブジェクトから、sensorid、timestamp、temperatureという3つのフィールドを取得しています。 sqlCmd = ( "insert into temperatures (sensorid, timestamp, temperature) values " + "('%s', '%s', %s)" % ( sensorid , timestamp , temperature ) ) このコードは、PostgreSQLに対して実行するSQL文を作成しています。ここでは、temperaturesテーブルに対して、sensorid、timestamp、temperatureの値を挿入するためのinsert文を作成しています。 payload = { "operation" : "exec" , "metadata" : { "sql" : sqlCmd } } このコードは、DaprのAPIに送るリクエストを定義しています。リクエストの形式は、利用するコンポーネントによって異なるのですが、PostgreSQLは以下の形式になります。 { "operation" : "exec" , "metadata" : { "sql" : "INSERT INTO foo (id, c1, ts) VALUES ($1, $2, $3)" , "params" : "[1, \"demo\", \"2020-09-24T11:45:05Z07:00\"]" } } このリクエストは、operationにexecを指定することで、SQL文を実行することを示しています。そして、metadataの中に、sqlというフィールドを定義して、その中に実行するSQL文を指定しています。ここでは、先程作成したsqlCmdという変数に格納されているSQL文を指定しています。 paramsというフィールドも定義されているのですが、今回は使用しません。paramsは、SQL文の中でプレースホルダを使用する場合に、そのプレースホルダに対応する値を指定するためのフィールドです。今回は、SQL文の中でプレースホルダを使用していないため、paramsは必要ありません。 よって今回送付するリクエストは以下のとおりとなります。 { "operation" : "exec" , "metadata" : { "sql" : "INSERT INTO temperatures (sensorid, timestamp, temperature) VALUES ('sensor-1', '2026-03-07T10:00:00Z', 22.5)" } } resp = requests . post ( "http://localhost:3617/v1.0/bindings/sqldb" , json = payload ) return resp requestsライブラリを使用して、DaprのAPIに対してリクエストを送っています。リクエストのURLには、先程のコンポーネント定義で指定した「sqldb」が含まれています。これにより、Daprは「sqldb」という名前のBindingsコンポーネントを参照して、PostgreSQLに対してSQL文を実行するようになります。リクエストのURLの構成は以下の通りです。リクエストボディには、先程作成したpayloadという変数に格納されているJSONオブジェクトを指定しています。 http://localhost:3617: Daprサイドカーのエンドポイント /v1.0: Dapr APIのバージョン /bindings: Bindings APIを呼び出すことを示すパス /sqldb: 先程のコンポーネント定義で指定した「sqldb」という名前のBindingsコンポーネントを参照するためのパス app.run(port=6107) Flaskアプリケーションをポート6107で起動しています。これにより、アプリはlocalhost:6107でHTTPリクエストを受け付けるようになります。RabbitMQからメッセージが送られてくると、Daprのサイドカーはこのポートに対してリクエストを送るため、アプリはこのポートでリクエストを受け取る必要があります。 実行方法 まずは必要なライブラリをインストールします。 $ pip install -r requirements.txt 次に、PostgreSQLとRabbbitMQのコンテナを起動します。 $ cd Infrastructure/db $ docker-compose up -d Dapr CLIを使用してアプリケーションコードとサイドカープロセスを起動します。 $ dapr run --app-id bindings --app-port 6107 --dapr-http-port 3617 --components-path Infrastructure/components -- python app.py 上記のコマンドの詳細を説明します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「bindings」というIDを指定しています。このIDは、Bindingsコンポーネントでは特に利用しませんが、Service Invocationなどの機能を利用する際に、このIDが重要になります。 –app-portオプションでアプリケーションのポート番号を指定します。ここでは6107を指定しています。これにより、アプリはlocalhost:6107で待ち受けるようになります。 –dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3617を指定しています。これにより、Daprサイドカーはlocalhost:3617でHTTPリクエストを受け付けるようになります。 –components-pathオプションでDaprコンポーネントの定義ファイルが格納されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。 — python app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。 このコマンドを実行した状態で、publish_temperatures.pyを実行してみましょう。 $ python publish_temperatures.py このコードを実行すると、RabbitMQに温度センサーのデータが送信されます。そして、Daprのサイドカーがそのデータを受け取って、アプリケーションコードに送ります。アプリケーションコードは、そのデータをPostgreSQLに保存します。 本当にデータがPostgreSQLに保存されたかを確認してみましょう。以下のコマンドを実行して、PostgreSQLのコンテナに接続します。 $ docker exec -i postgres psql --username postgres --dbname temperatures -c "select * from temperatures;" sensorid | timestamp | temperature ----------+------------------------+------------- sensor-1 | 2026 -03-07 10 :00:00+00 | 22.5 ( 1 row ) すると、temperaturesテーブルの中に、先程publish_temperatures.pyを実行したときに送信したデータが保存されていることが確認できます。 Service Invocation Service Invocationは、Daprの機能の一つであり、アプリケーションが他のアプリケーションに対してリクエストを送るときに、便利な機能がたくさんあります。 アプリケーションは、他のアプリケーションのこと(IPアドレスなど)を知らなくても、DaprのAPIを通じて、他のアプリケーションに対してリクエストを送ることができます。 アプリケーション同士の通信が失敗しても、サイドカーが自動的にリトライしてくれるため、通信の信頼性が高まります。 サイドカーが、アプリケーション同士の通信をmTLSによって暗号化してくれるため、通信のセキュリティが高まります。 サイドカーが、アプリケーション同士の通信をモニターして、ログやメトリクスなどを収集してくれるため、通信の可観測性が高まります。 つまり、サイドカーがいろんなことをしてくれるので、アプリケーション側では何も考えずに、DaprのAPIを通じて、他のアプリケーションに対してリクエストを送ることができます。これが、Service Invocationの大きなメリットの一つです。 様々な利便性があるService Invocationですが、1の機能に焦点をあてて、具体的な事例(センサーデータを送るアプリケーションから、センサーのデータを受け取って処理するアプリケーションに対してリクエストを送る)を通じて、Service Invocationの使い方を説明していきます。 システム構成 ということで、センサーデータを送るアプリケーションから、センサーのデータを受け取って処理するアプリケーションに対してリクエストを送るシステムを例に挙げて説明します。構成図は以下のとおりです。 ① アプリケーションIDが「telemetry-sender」のアプリケーションは、DaprのAPIを使って、アプリケーションIDが「telemetry-collector」のアプリケーションにリクエストを送ります。 このときのURLは http://localhost:3615/v1.0/invoke/telemetry-collector/method/telemetry です。 ここで「3615」は telemetry-sender 側のDaprサイドカーのHTTPポートを表しています。 また、このURLのパスは「telemetry-collector」というアプリケーションの「telemetry」というエンドポイントを呼び出すことを意味します。 ② サイドカーはこのリクエストを受け取ると、mDNS(multicast DNS)を使って、アプリケーションID「telemetry-collector」がどこで動いているか(IPアドレス)を調べます。 ③ すると、telemetry-collector 側のサイドカー内のmDNSリゾルバーが応答し、「自分が telemetry-collector である」と名乗って、自身のIPアドレスなどの情報を返します。 ④ telemetry-sender 側のサイドカーは、その情報をもとに telemetry-collector のサイドカーへリクエストを送ります。 ⑤ telemetry-collector 側のサイドカーは、受け取ったリクエストを実際のアプリケーション(telemetry-collector)に転送します。 このように、アプリケーションID「telemetry-sender」のアプリケーションは、アプリケーションID「telemetry-collector」のアプリケーションのIPアドレスやポート番号を知らなくても、DaprのAPIを通じて、アプリケーションID「telemetry-collector」のアプリケーションに対してリクエストを送ることができます。これが、Service Invocationの大きなメリットの一つです。 ちなみに本記事で紹介している構成(Self-hosted)では、mDNSを使っていますが、AWSのECSやAzureのAKSなどのコンテナオーケストレーションサービスを使用している場合は、mDNSの代わりに、高度なDNSリゾルバーが使用されます。 ファイル構成 Service Invocationコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ( https://github.com/noriyukitakei/dapr-sample )の中の、ServiceInvocationディレクトリの中に配置されています。 ServiceInvocation ├── telemetry-collector │ ├── app.py │ └── requirements.txt └── telemetry-sender ├── app.py └── requirements.txt ソースコードの説明 ■ telemetry-collector/app.py app = Flask ( __name__ ) このコードは、Flaskを使用して、HTTPサーバーを立てています。 @app . route ( "/telemetry" , methods = [ "POST" ] ) def telemetry ( ) : data = request . json print ( "Collector received:" , data , flush = True ) return ( json . dumps ( { "ok" : True } ) , 200 , { "ContentType" : "application/json" } , ) このコードは、/telemetryというエンドポイントに対してPOSTリクエストが送られたときに呼び出される関数を定義しています。このエンドポイントは、先程の構成図の中で、アプリケーションID「telemetry-sender」のアプリケーションが呼び出すエンドポイントになります。 アプリケーションID「telemetry-sender」が呼び出すエンドポイントは、先ほど説明した通り、 http://localhost:3615/v1.0/invoke/telemetry-collector/method/telemetry になります。このURLのパスの最後の部分である「telemetry」が、telemetry-collector 側のアプリケーションのエンドポイントになります。なので、telemetry-collector 側のアプリケーションは、このエンドポイントを定義する必要があります。 telemetry関数の中では、リクエストのJSONボディを取得して、コンソールに出力しています。そして、HTTPレスポンスとして、成功を示すJSONオブジェクトを返すという非常にシンプルな処理をしています。 app . run ( port = 6106 ) Flaskアプリケーションをポート6106で起動しています。これにより、アプリはlocalhost:6106でHTTPリクエストを受け付けるようになります。telemetry-senderからリクエストが送られてくると、Daprのサイドカーはこのポートに対してリクエストを送るため、アプリはこのポートでリクエストを受け取る必要があります。 ■ telemetry-sender/app.py payload = { "sensorId" : "sensor-1" , "temperatureC" : 23 } このコードは、telemetry-collectorに送るデータを表すJSONオブジェクトを作成しています。このJSONオブジェクトには、sensorIdとtemperatureCという2つのフィールドが含まれています。センサーのIDを表すsensorIdと、センサーのデータである温度を表すtemperatureCです。 response = requests . post ( url = "http://127.0.0.1:3615/v1.0/invoke/telemetry-collector/method/telemetry" , data = json . dumps ( payload ) , headers = { "content-type" : "application/json" , } , ) このコードは、requestsライブラリを使用して、DaprのAPIに対してリクエストを送っています。リクエストのURLには、先程の構成図の中で、アプリケーションID「telemetry-sender」のアプリケーションが呼び出すURLである http://localhost:3615/v1.0/invoke/telemetry-collector/method/telemetry が指定されています。リクエストボディには、先程作成したpayloadという変数に格納されているJSONオブジェクトを指定しています。 実行方法 まずは必要なライブラリをインストールします。 $ pip install -r telemetry-sender/requirements.txt $ pip install -r telemetry-collector/requirements.txt 次に、telemetry-collectorのアプリケーションコードとサイドカープロセスを起動します。 $ dapr run --app-id telemetry-collector --app-port 6106 --dapr-http-port 3616 -- python telemetry-collector/app.py 上記のコマンドの詳細を説明します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「telemetry-collector」というIDを指定しています。このIDは、Service Invocationでは非常に重要になります。なぜなら、アプリケーションID「telemetry-sender」のアプリケーションが、アプリケーションID「telemetry-collector」のアプリケーションにリクエストを送るときに、このIDを使用して、どのアプリケーションにリクエストを送るかを指定するからです。このアプリケーションIDを元にして、Daprのサイドカーは、mDNSを使って、アプリケーションID「telemetry-collector」がどこで動いているか(IPアドレス)を調べます。 –app-portオプションでアプリケーションのポート番号を指定します。ここでは6106を指定しています。これにより、アプリはlocalhost:6106で待ち受けるようになります。 –dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3616を指定しています。これにより、Daprサイドカーはlocalhost:3616でHTTPリクエストを受け付けるようになります。 — python telemetry-collector/app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。 次に、telemetry-senderのアプリケーションコードとサイドカープロセスを起動します。 $ dapr run --app-id telemetry-sender --dapr-http-port 3615 -- python telemetry-sender/app.py 上記のコマンドの詳細を説明します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「telemetry-sender」というIDを指定しています。 –dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3615を指定しています。これにより、Daprサイドカーはlocalhost:3615でHTTPリクエストを受け付けるようになります。 — python telemetry-sender/app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。 このコマンドを実行すると、telemetry-sender側のコンソールに以下のような出力がされます。 == APP == Sender sent: {'sensorId': 'sensor-1', 'temperatureC': 23} == APP == Response status: 200 そして、telemetry-collector側のコンソールには以下のような出力がされます。 == APP == Collector received: {'sensorId': 'sensor-1', 'temperatureC': 23} == APP == 127.0.0.1 - - [03/Apr/2026 13:53:42] "POST /telemetry HTTP/1.1" 200 - 上記のようなものが出力されていれば、telemetry-senderのアプリケーションが、telemetry-collectorのアプリケーションに対してリクエストを送ることができていることが確認できます。 Secrets Management Secrets Managementは、Daprの機能の一つであり、アプリケーションがシークレットを安全に管理するための機能です。クラウドのサービスで、シークレットを扱うサービスはたくさんあります。AWSのSecrets ManagerやAzureのKey Vaultなどが有名です。これらのサービスを利用することで、シークレットを安全に管理することができます。 ただし、これらのサービスを使うためには、それぞれの専用のAPIを呼び出したり、SDKを使用したりする必要があります。これらのサービスを利用するためのコードを書くのは、面倒なことが多いです。DaprのSecrets Managementを利用することで、これらのサービスを利用するためのコードを書く必要がなくなります。DaprのAPIを通じて、シークレットを取得することができるようになります。 アプリからはDaprのサイドカーに、「シークレットを取得したい」というリクエストを送ります。すると、サイドカーは、あらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送ります。そして、シークレットストアからシークレットが返ってくると、サイドカーは、そのシークレットをアプリに返します。 システム構成 では、シークレットを取得するアプリケーションの例を通じて、Secrets Managementの使い方を説明していきます。構成図は以下のとおりです。 今回ご紹介する例では、AWS Secrets ManagerやAzure Key Vaultではなく、ローカルに配置したJSONファイルをシークレットストアとして使用します。Daprは、ローカルに配置したJSONファイルをシークレットストアとして使用することができます。これにより、AWS Secrets ManagerやAzure Key Vaultなどのクラウドのサービスを利用しなくても、Secrets Managementの機能を試すことができます。 ローカルに配置したJSONファイルをシークレットストアとして使用する方式は、Daprの公式ドキュメントにおいては本番環境では推奨されていません。今回の説明のために、ローカルに配置したJSONファイルをシークレットストアとして使用する方式を紹介していますが、実際のプロジェクトでSecrets Managementの機能を利用する際には、AWS Secrets ManagerやAzure Key Vaultなどのクラウドのサービスを利用することをおすすめします。 ということで先ほどの構成図に基づいて、シークレットを取得するアプリケーションの例を通じて、Secrets Managementの使い方を説明していきます。 ① アプリケーションは、DaprのAPIを使って、シークレットを取得するためのリクエストをサイドカーに送ります。 このときのURLは http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret です。このURLの構成を説明します。 http://127.0.0.1:3618 : DaprサイドカーのHTTPポート /v1.0/secrets: シークレット管理のAPIエンドポイント /local-secretstore: 使用するシークレットストアの名前( コンポーネント定義で指定したmetadata.name ) /my-secret: 取得したいシークレットの名前 ② サイドカーはこのリクエストを受け取ると、あらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送ります。 ファイル構成 Secrets Managementコンポーネントを利用するためのファイル構成は以下の通りとなります。これらのファイルは先程紹介したGitHubリポジトリ( https://github.com/noriyukitakei/dapr-sample )の中の、SecretsManagementディレクトリの中に配置されています。 SecretsManagement ├── app.py ├── Infrastructure │ └── components │ ├── local-secretstore.yaml │ └── secrets.json ├── README.md └── requirements.txt ソースコードの説明 では、Secrets Managementの機能を利用するためのソースコードを説明していきます。 ■ Infrastructure/components/local-secretstore.yaml apiVersion : dapr.io/v1alpha1 # DaprのAPIバージョン kind : Component # コンポーネントの種類 metadata : name : local - secretstore # コンポーネントの名前 spec : type : secretstores.local.file # コンポーネントの種類を指定 version : v1 metadata : - name : secretsFile # シークレットストアとして使用するJSONファイルのパスを指定するためのメタデータ value : "./Infrastructure/components/secrets.json" # シークレットストアとして使用するJSONファイルのパス このファイルは、DaprのSecrets ManagementコンポーネントをローカルのJSONファイルをシークレットストアとして使用するように設定するためのコンポーネント定義ファイルです。 metadata.nameは、アプリケーションがこのコンポーネントを参照するための名前になります。先ほどご紹介したシステム構成の中で、アプリケーションが送るリクエストのURLは http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret でした。このURLの中の「local-secretstore」という部分は、metadata.nameで指定した名前になります。つまり、metadata.nameが「local-secretstore」であることによって、アプリケーションは「local-secretstore」という名前のコンポーネント定義ファイルを参照して、シークレットストアとして使用するJSONファイルのパスを知ることができるようになります。 spec.typeは、コンポーネントの種類を指定するためのフィールドです。ここでは、Daprが提供しているローカルのJSONファイルをシークレットストアとして使用するためのコンポーネントであるsecretstores.local.fileを指定しています。Azure Key Vaultの場合はsecretstores.azure.keyvault、AWS Secrets Managerの場合はsecretstores.aws.secretsmanagerを指定します。 metadataの中には、spec.typeで指定したコンポーネントの種類に応じたメタデータを定義します。ローカルのJSONファイルをシークレットストアとして使用するためのコンポーネントであるsecretstores.local.fileの場合は、secretsFileという名前のメタデータを定義して、そのvalueにシークレットストアとして使用するJSONファイルのパスを指定します。 ■ Infrastructure/components/secrets.json { "my-secret" : "hello-from-file" } このファイルは、ローカルに配置したJSONファイルをシークレットストアとして使用するためのJSONファイルです。このファイルには、シークレットの名前と値のペアが定義されています。ここでは、my-secretという名前のシークレットに対して、hello-from-fileという値が定義されています。 ■ app.py URL = "http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret" print ( requests . get ( URL ) . text ) このコードは、DaprのAPIを使って、シークレットを取得するためのリクエストをサイドカーに送っています。リクエストのURLには、先程の構成図の中で、アプリケーションが送るリクエストのURLである http://127.0.0.1:3618/v1.0/secrets/local-secretstore/my-secret が指定されています。リクエストを送ると、サイドカーは、あらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送ります。そして、シークレットストアからシークレットが返ってくると、サイドカーは、そのシークレットをアプリに返します。最後に、取得したシークレットをコンソールに出力しています。 実行方法 まずは必要なライブラリをインストールします。 $ pip install -r requirements.txt 次に、Dapr CLIを使用してアプリケーションコードとサイドカープロセスを起動します。 $ dapr run --app-id secretsmanagement --dapr-http-port 3618 --components-path Infrastructure/components -- python app.py .. .略 .. . == APP == { "my-secret" : "hello-from-file" } 上記のコマンドの詳細を説明します。 –app-idオプションでアプリケーションのIDを指定します。ここでは「secretsmanagement」というIDを指定しています。 –dapr-http-portオプションでDaprサイドカーのHTTPポートを指定します。ここでは3618を指定しています。これにより、Daprサイドカーはlocalhost:3618でHTTPリクエストを受け付けるようになります。 –components-pathオプションでDaprコンポーネントの定義ファイルが格納されているディレクトリを指定します。ここではInfrastructure/componentsを指定しています。これにより、Daprサイドカーはこのディレクトリの中にあるコンポーネント定義ファイルを読み込むようになります。 — python app.pyは、Daprサイドカーが起動した後に実行するコマンドを指定しています。ここでは、app.pyを実行するように指定しています。つまり、アプリケーションコードを実行するように指定しています。 このコマンドを実行すると、コンソールに{“my-secret”:”hello-from-file”}と出力されます。これは、DaprのAPIを通じて、シークレットを取得するためのリクエストをサイドカーに送った結果、サイドカーがあらかじめ設定されているシークレットストアに対して、シークレットを取得するためのリクエストを送って、そのシークレットをアプリに返した結果になります。つまり、DaprのSecrets Managementの機能が正常に動作していることが確認できます。 デモアプリ ひと通り、Daprの基本的な機能であるPub/Sub、Bindings、Service Invocation、Secrets Managementについて説明してきました。これらの機能を組み合わせることで、ちょっぴり本格的なマイクロサービスを作ってみましょう。 このアプリのソースコードは以下のリポジトリで公開しております。 https://github.com/noriyukitakei/dapr-demo 機能概要 このデモアプリは、センサーから送られてくるデータをリアルタイムに処理し、可視化やアラート通知を行うシステムです。 まず、センサーのシミュレーターが温度データを生成します。このデータは一度メッセージング基盤に送られ、そこからシステム全体に配信されます。 データを受け取ったゲートウェイの役割を持つサービスは、その内容を保存するとともに、異常値かどうかを判断します。 もし異常な値であれば、別のサービスに通知され、メールなどでアラートが送信されます。 一方で、通常のデータは蓄積され、ダッシュボードサービスから参照できるようになります。ユーザーはブラウザなどからセンサーの状態を確認でき、現在の状況をリアルタイムに把握することができます。 つまりこのシステムは、 データの収集 データの保存 異常検知 通知 可視化 といった一連の処理を、複数のサービスに分けて実現しています。 システム構成 デモアプリのシステム構成は以下のとおりです。 このシステムは3つのマイクロサービスで構成されています。デバイスからセンサーを受信するSensor Gateway Service、異常値を検知してアラートを送るAlert Service、センサーの状態を可視化するDashboard Serviceです。 このデモアプリでは、センサーのシミュレーターは、Sensor Gateway Serviceに対してセンサーデータを送ります。Sensor Gateway Serviceは、そのデータを保存するとともに、異常値かどうかを判断します。もし異常な値であれば、Alert Serviceに通知され、メールなどでアラートが送信されます。一方で、通常のデータは蓄積され、Dashboard Serviceから参照できるようになります。 では、システム構成図の中の①~⑰の流れに沿って、システム全体の動きを説明していきます。 ① センサーのシミュレーターであるSensor Simulatorが、センサーデータを生成する。アラートが上がるようなある一定温度以上のデータを一定確率で生成して、MQTTに送信する。 ② MQTTに送られたデータをSensor Gateway Serviceが受信する。Sensor Gateway Serviceは、DaprのInput Bindings機能を使って、MQTTからデータを受け取るように設定されている。 ③ Sensor Gateway Serviceのサイドカーは、MQTTからデータを受け取ると、そのデータをSensor Gateway Serviceに送る。受け取るためのエンドポイントはInput Bindingsのコンポーネント定義で指定されている。 ④ Sensor Gateway Serviceのサイドカーは、Secrets Managementの機能を使って、機密情報を格納しているデータストア(今回はローカルのJSONファイル)から、アラートのしきい値などを取得する。 ⑤ Sensor Gateway Serviceは、④で受け取ったしきい値を元に、受け取ったセンサーデータが異常値かどうかを判断する。 ⑥ Sensor Gateway Serviceは、受け取ったセンサーデータをRedisに保存する。これはState Managementの機能を使って実現している。 ⑦ Sensor Gateway Serviceは、受け取ったセンサーデータが正常値であればtelemetryトピックに、異常値であればalertトピックに送信する。これはPub/Subの機能を使って実現している。 ⑧ RabbitMQは、alertトピックに送られた異常値のデータをAlert Serviceに配信する。Alert Serviceは、DaprのPub/Sub機能を使って、alertトピックからデータを受け取るように設定されている。 ⑨ Alert Serviceのサイドカーは、alertトピックからデータを受け取ると、そのデータをAlert Serviceに送る。/alertsというエンドポイントに対してPOSTリクエストを送るように設定されている。このエンドポイントは、Alert Serviceのアプリケーションコードの中で定義されている。 ⑩ Alert Serviceは、受け取ったデータを元に、メールなどでアラートを送る。Output Bindingsの機能を使って、メールを送るように設定されている。 ⑪ Alert Serviceのサイドカーは、Output Bindingsの機能を使って、指定されたSMTPサーバーに対してメールを送信する。このメールサーバーは、maildev( https://github.com/maildev/maildev )というローカルで動かすことができるメールサーバーを使用している。 ⑫ 管理者は、DashBoard Serviceが提供しているエンドポイントに対して、センサーの状態を確認するためのリクエストを送る。 ⑬ Dashboard Serviceは、Service Invocationの機能を使って、Sensor Gateway Serviceに対してリクエストを送る。リクエストの内容は、センサーの状態を確認するためのものである。リクエストのURLは、センサーのIDがsensor-001とした場合、 http://localhost:3610/v1.0/invoke/sensorgateway/method/state/sensor-001 である。このURLのパスのsensorgatewayという部分は、Sensor Gateway ServiceのアプリケーションIDである。つまり、Dashboard Serviceは、Service Invocationの機能を使って、Sensor Gateway Serviceに対してリクエストを送るときに、Sensor Gateway ServiceのアプリケーションIDを指定している。state/sensor-001という部分は、Sensor Gateway Serviceのアプリケーションコードの中で定義されているエンドポイントになる。つまり、Dashboard Serviceは、Service Invocationの機能を使って、Sensor Gateway Serviceに対してリクエストを送るときに、Sensor Gateway Serviceのアプリケーションコードの中で定義されているエンドポイントを指定している。 ⑭ DashBoard Serviceのサイドカーは、mDNSを使って、Sensor Gateway Serviceのサイドカーがどこで動いているか(IPアドレス)を調べて、Sensor Gateway Serviceのサイドカーにリクエストを送る。 ⑮ Sensor Gateway Serviceのサイドカーは、受け取ったリクエストをSensor Gateway Serviceの/state/{sensor_id}というエンドポイントに転送する。 ⑯ Sensor Gateway Serviceは、受け取ったリクエストを元に、Sate Gateway ServiceのState Managementの機能を使って、Redisに保存されているセンサーの状態を取得する。 ⑰ Sensor Gateway Serviceのサイドカーは、Redisにアクセスしてセンサーの状態を取得した後、その状態をDashboard Serviceに返す。 ファイルの構成 デモアプリのファイル構成は以下のとおりです。 . ├── AlertService │ ├── alertservice │ │ └── app.py │ └── requirements.txt ├── DashboardService │ ├── dashboard │ │ └── app.py │ └── requirements.txt ├── Infrastructure │ ├── components │ │ ├── mqtt-binding.yaml │ │ ├── notify-binding.yaml │ │ ├── pubsub.yaml │ │ ├── secretstore.yaml │ │ └── statestore.yaml │ ├── docker-compose.yml │ ├── mosquitto.conf │ └── secrets.json ├── SensorGatewayService │ ├── requirements.txt │ └── sensorgateway │ ├── app.py │ └── settings.py ├── Simulation │ ├── requirements.txt │ └── simulate.py └── common ├── pyproject.toml └── smartfarm_common └── models.py それぞれのファイルの内容をサービスごとに説明します。 Sensor Gateway Service ■ sensorgateway/app.py センサーのデータを受信して処理するサービスです。センサーデータを受信して、異常値かどうかを判断し、データを保存したり、異常値であればアラートを送ったりします。 Input Bindingsを使ってMQTTからデータ受信 State Managementを使ってデータ保存(Redis) Pub/Subでイベント発行(RabbitMQに正常データ、異常データをそれぞれ別のトピックに送る) DashBoard Serviceからのリクエストに応じて、センサーの状態を返すエンドポイントを提供 ■ sensorgateway/settings.py アラートのしきい値など各種設定を管理するファイルです。DaprのSecrets Management機能を使って、ローカルのJSONファイルから設定値を取得するようになっています。 Alert Service ■ alertservice/app.py アラート通知を担当するサービスです。異常なセンサーデータを検知した際に、通知処理を行います。 メッセージキュー(RabbitMQ)から異常データ(alertトピックに送られたデータ)を受信 異常時にOutput Bindingsの機能を使って、メールで通知 Dashboard Service ■ dashboard/app.py ユーザー向けにデータを表示するサービスです。以下の機能を提供します。 /sensors/{sensor_id}というAPIエンドポイントを提供して、センサーの状態を返却 Infrastructure ■ components/mqtt-binding.yaml MQTTからデータを受信するためのInput Bindingsのコンポーネント定義ファイルです。 ■ components/notify-binding.yaml メールを送るためのOutput Bindingsのコンポーネント定義ファイルです。maildevというローカルで動かすことができるメールサーバーに対して、SMTPプロトコルを使ってメールを送るように設定されています。 ■ components/pubsub.yaml RabbitMQをPub/Subのコンポーネントとして使用するためのコンポーネント定義ファイルです。 ■ components/secretstore.yaml ローカルのJSONファイルをSecrets Managementのシークレットストアとして使用するためのコンポーネント定義ファイルです。 ■ components/statestore.yaml RedisをState Managementのステートストアとして使用するためのコンポーネント定義ファイルです。 ■ docker-compose.yml RabbitMQ、mosquitto(MQTTブローカー)、maildev(SMTPサーバー)をDockerコンテナで起動するためのdocker-composeファイルです。 ■ mosquitto.conf mosquitto(MQTTブローカー)の設定ファイルです。MQTTのポート番号や、認証の設定などが記述されています。 ■ secrets.json DaprのSecrets Managementの機能を使って、ローカルのJSONファイルをシークレットストアとして使用するためのJSONファイルです。アラートのしきい値などの設定値が定義されています。 Simulation ■ simulate.py センサーのシミュレーターです。センサーデータを生成して、MQTTに送る役割を持っています。 common ■ smartfarm_common/models.py このファイルは、Sensor Gateway Service、Alert Service、Dashboard Serviceの3つのサービスで共通して使用するモデルクラスを定義しています。センサーデータの構造などを定義しています。 実行方法 このデモアプリを実行するための方法を説明します。以下の手順に従ってください。 事前準備 既にDapr CLIをインストールしている場合は、Dapr CLIのインストールはスキップしていただいて構いません。まだインストールしていない場合は、以下のコマンドを実行して、Dapr CLIをインストールしてください。 $ wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash $ dapr init リポジトリのクローンとディレクトリ移動 このデモアプリが公開されているGitHubリポジトリをクローンして、プロジェクトのディレクトリに移動します。 $ git clone https://github.com/noriyukitakei/dapr-demo.git $ cd dapr-demo 必要なDockerコンテナの起動 デモアプリの稼働に必要な3つのコンテナ(RabbitMQ、mosquitto、maildev)をDocker Composeを使って起動します。 $ cd Infrastructure $ docker-compose up -d Sensor Gateway Serviceの起動 Pythonの仮想環境を作成して有効にします。 $ cd .. /SensorGatewayService $ python -m venv .venv $ source .venv/bin/activate 必要なライブラリをインストールします。 $ pip install -r requirements.txt Sensor Gateway Serviceのアプリケーションとサイドカープロセスを起動します。 $ dapr run --app-id sensorgateway --app-port 6100 --dapr-http-port 3610 --components-path .. /Infrastructure/components -- python -m uvicorn sensorgateway.app:app --port 6100 Alert Serviceの起動 Sensor Gateway Serviceはフォアグラウンドで起動したままにしておいてください。新しいターミナルを開いて、Alert Serviceを起動します。 Pythonの仮想環境を作成して有効にします。 $ cd [ GitHubリポジトリのクローン先ディレクトリ ] /AlertService $ python -m venv .venv $ source .venv/bin/activate 必要なライブラリをインストールします。 $ pip install -r requirements.txt Alert Serviceのアプリケーションとサイドカープロセスを起動します。 $ dapr run --app-id alertservice --app-port 6101 --dapr-http-port 3611 --components-path .. /Infrastructure/components -- python -m uvicorn alertservice.app:app --port 6101 Dashboard Serviceの起動 Sensor Gateway ServiceとAlert Serviceはフォアグラウンドで起動したままにしておいてください。新しいターミナルを開いて、Dashboard Serviceを起動します。 Pythonの仮想環境を作成して有効にします。 $ cd [ GitHubリポジトリのクローン先ディレクトリ ] /DashboardService $ python -m venv .venv $ source .venv/bin/activate 必要なライブラリをインストールします。 $ pip install -r requirements.txt Dashboard Serviceのアプリケーションとサイドカープロセスを起動します。 $ dapr run --app-id dashboardservice --app-port 6102 --dapr-http-port 3612 --components-path .. /Infrastructure/components -- python -m uvicorn dashboard.app:app --port 6102 シミュレーターの起動と動作確認 Sensor Gateway ServiceとAlert ServiceとDashboard Serviceはフォアグラウンドで起動したままにしておいてください。新しいターミナルを開いて、センサーのシミュレーターを起動します。 Pythonの仮想環境を作成して有効にします。 $ cd [ GitHubリポジトリのクローン先ディレクトリ ] /Simulation $ python -m venv .venv $ source .venv/bin/activate 必要なライブラリをインストールします。 $ pip install -r requirements.txt シミュレーターを起動します。すると、定期的にセンサーデータが生成されてMQTTに送られます。 $ python simulate.py Publishing telemetry to mqtt://localhost:1883 topic = farm/telemetry sent: { 'sensorId' : 'sensor-003' , 'temperature' : 29.89 , 'humidity' : 60.44 , 'timestamp' : '2026-04-07T00:44:04.142516+00:00' } sent: { 'sensorId' : 'sensor-002' , 'temperature' : 24.09 , 'humidity' : 60.69 , 'timestamp' : '2026-04-07T00:44:05.145853+00:00' } sent: { 'sensorId' : 'sensor-002' , 'temperature' : 33.83 , 'humidity' : 41.07 , 'timestamp' : '2026-04-07T00:44:06.147417+00:00' } sent: { 'sensorId' : 'sensor-003' , 'temperature' : 22.22 , 'humidity' : 35.16 , 'timestamp' : '2026-04-07T00:44:07.149945+00:00' } Alert Serviceのコンソールには、異常値が検知されたときに以下のような出力がされます。 == APP == WARNING:alertservice:ALERT received: Temperature 42 .27C exceeds threshold 40 .0C == APP == INFO: 127.0 .0.1:53585 - "POST /alerts HTTP/1.1" 200 OK == APP == WARNING:alertservice:ALERT received: Temperature 44 .84C exceeds threshold 40 .0C == APP == INFO: 127.0 .0.1:53660 - "POST /alerts HTTP/1.1" 200 OK メールが送付されているはずですので、見てみましょう。maildevは、http://localhost:1080でWeb UIが提供されているので、ブラウザでアクセスしてみてください。すると、Alert Serviceから送られたメールが届いていることが確認できます。 DashBoard Serviceにアクセスして、センサーの状態を確認してみましょう。センサーのIDがsensor-001とした場合、以下のURLにアクセスしてみてください。 $ curl http://localhost:6102/sensors/sensor-001 { "found" :true, "state" : { "sensorId" : "sensor-001" , "temperature" :31.88, "humidity" :45.29, "timestamp" : "2026-04-07T00:51:40.901649Z" , "updatedAt" : "2026-04-07T00:51:40.906840Z" } } 上記のようなレスポンスが返ってくれば、Dashboard ServiceがSensor Gateway Serviceに対してリクエストを送って、センサーの状態を取得することができていることが確認できます。 まとめ 今回は、Daprの基本的な機能であるPub/Sub、Bindings、Service Invocation、Secrets Managementについて、わかりみ深く説明しました。これらの機能を組み合わせることで、ちょっぴり本格的なマイクロサービスを作ることができることも紹介しました。 Daprは、マイクロサービスを開発するための便利な機能を提供しているので、ぜひ活用してみてください。Daprを使うことで、マイクロサービスの開発がより簡単になり、開発者はビジネスロジックの実装に集中することができるようになります。Daprは、マイクロサービスの開発を加速させるための強力なツールであると言えるでしょう。 ご覧いただきありがとうございます! この投稿はお役に立ちましたか? 役に立った 役に立たなかった 0人がこの投稿は役に立ったと言っています。 The post 世界一わかりみの深いDapr first appeared on SIOS Tech Lab .
はじめに こんにちは!26卒エンジニアで、2025年9月からレバレジーズで内定者インターンをしている谷口功樹です! この記事を通して、 内定者インターンとしてTQCチームで活躍してきた 中で、僕がどのようにエンジニアとしての視点を広げていったのかを振り返ります。 突然ですが、皆さんは歩いている時、普段どこを見ていますか? 転ばないように地面を見ていますか? それとも、誰かとぶつからないように真っ直ぐ前を見ていますか? 僕は、意識的に空を見上げるようにしています。かつて下ばかり向いていた自分が、いかに小さな世界に閉じこもっていたかに気づいてしまったからです。 空を眺めていると、自分の可能性も無限にあるような、そんな気がしてきます。 …と、いきなりこんな抽象的なことを語ったのには理由が当然あります。 今回のインターンで僕が感じた、まさにそんな自分の意思での 「視界と可能性の広がり」 をお伝えできればと思います! 自己紹介 改めて自己紹介をさせていただきます。谷口功樹です。こんな普通の名前ですが、 実はアメリカ生まれのアメリカ育ちです! 日本生活歴は、留学と夏休みの旅行を含めて合計2年くらいです。 大学ではコンピューターサイエンスを専攻し、主にAIや統計学を学んでいました。同時に、日本語と日本文化の授業も頻繁に履修していました。その縁もあり、2024年の春には京都大学へ留学していました。 なぜ日本で就職?なぜレバレジーズ? その京都での留学が、僕にとって大きな転換点となりました。実際に日本で生活してみた中、自分のライフスタイルは日本に合っていると確信し、卒業後は 日本でキャリアを築きたい と考えるようになりました。 その上、就活を通して、アメリカと日本のソフトウェアエンジニア業界の違いを痛感したことも理由の一つです。現在のアメリカでは、AIの台頭もあって、新卒エンジニアの枠が爆速で縮まり、実際に僕もアメリカの企業から面接の案内をいただくことは一度もありませんでした。 アメリカの企業にとっての新卒は、将来への投資ではなく、単なる安価な労働力に過ぎなかったのではないかと、勝手ながらそんな風に感じてしまいました。一方で、日本の企業には、まだ若芽の可能性を信じて、ポテンシャルを育てようとする文化が残っているように見えました。 そんな中参加した 2024年11月のBoston Career Forumで出会ったのがレバレジーズでした。 面接で一番印象に残ったのは、会社側が「僕がどれほどの価値を提供できるか」だけでなく、「会社が僕にどんな経験を提供できるか」を真剣に考えてくれたことです。一方的な採用ではなく、お互いの成長を模索するその姿勢に惹かれ、2025年5月に卒業した直後、内定をいただき即承諾、そして9月から内定者インターンとして参加することを決めました。 インターン中の配属はTQCチーム!てかTQCって何? インターン開始前の面談で、こんな質問をされました。 「将来正社員になった時にやるような業務(つまりサービス開発)、それともあえてやらなさそうなこと、どっちを経験してみたい?」 思わぬ質問に数秒間迷いましたが、今回のインターンにおける僕の目標はすでに明確でした。 日本での生活に慣れる 日本のビジネスカルチャーを知る とにかく新しいことを学ぶ そしてやはり、僕は「やらなさそうなこと」を選択しました。そうして配属されたのが、テクノロジー戦略室のTQCチームでした。 「...でTQCって何?」 と思われるかもしれません。少なくとも僕は思いました。TQCはTotal Quality Controlの略で、開発されているサービスやアプリの品質管理を行っており、現在はセキュリティと負荷・パフォーマンス領域に注力しております。ただバグを見つけるだけでなく、開発工数削減や顧客の信頼獲得、そしてビジネス面での損失回避など、会社を影から支える重要な役割を担っています。 品質向上への道 レバレジーズの内定者インターンとは、インターンっていうより、ほぼ正社員である。 もちろん正式な立場は「インターン」ですが、任される業務は周りのエンジニアとあまり変わりません。TQCチームの一員として、主に以下の3つの実務に取り組みました。 ペネトレーションテスト 攻撃者の視点に立ち、社内サービスの脆弱性を探りました。具体的には、Burp Suiteを使ってHTTPリクエストを解析・改ざんしたり、Pythonやシェルスクリプトを書いてテストを自動化したりしました。おまけで、各サービスの技術スタックを横断的に知る良い機会にもなりました。 やってみて一番驚いたのは、大手企業の開発チームが作ったプロダクトでも、完全に脆弱性を無くすことはできないと言うことです。もちろんTQCチームの充実したテスト項目のマニュアルがあったからこそ見つけられた面もありますが、最初は「初心者の自分でもこんなに簡単に見つけてしまえるのか」と驚くと同時に、この作業の重要性を強く実感しました。 負荷試験 負荷試験は2回しか実施しませんでしたが、その2回だけでも色々知識は得られました。 詳しくは 僕の上司が書いた記事 に全てありますが、流れをざっくり説明すると、ブラウザ操作を.harファイルとして保存し、JavaScriptで試験シナリオを記述できる負荷試験ツールである k6 が読み込める形式にhar-to-k6というライブラリで変換して実行するというものです。監視にはGrafana、Prometheus、AWS CloudWatchを活用し、ダッシュボードを自作するためにPromQLを、負荷がスケーリングにどう影響するか理解するためにKubernetesも少し学習しました。 特定の脆弱性を突くペネトレーションテストとは違い、負荷試験はサービス全体を俯瞰して捉えることが重要だと学びました。 自作のGrafanaダッシュボード インフラ構築 「とにかく新しいことを学ぶ」ため、未経験だったクラウドインフラ、Terraform、GitHub Actions(CI/CD)をセットで扱える課題を上司に頼みました。 具体的には、GitHub ActionsとTerraformを連携させてAWS上にApacheサーバーを立ち上げ、任意のDockerコンテナを稼働できる環境を構築しました。そして運用性を考慮し、CloudWatch Logsで簡単にログを確認できる仕組みも併せて実装しました。 この構築作業はインフラの基礎を体系的に学ぶ良い機会となっただけでなく、後述するAIエージェントの検証環境としても、重要な役割を果たすことになりました。 いよいよ実践!ペネトレーションテスト自動化エージェントの開発 そして、今回のインターン期間の集大成は、PythonとGoogle ADK(Agent Development Kit)を駆使した、 ペネトレーションテスト自動化エージェントの開発 でした! 簡単にいうと、AIにぺネトレーションテストを自律的に行ってもらうツールなのですが、単にプロンプトを投げて暴れてもらうだけではありません。エージェントがただ闇雲に動くのではなく、論理的に動くための設計に、力を入れました。 大学でAIや統計学を履修していたとはいえ、AIの数学的な理論や基礎的なアルゴリズムの学習が中心だったので、LLMやエージェントの実装は、 僕にとって全く新しい挑戦でした。 開発に着手して真っ先に直面したのが実行基盤の選定で、Google Cloudの利用を前提としつつ、ネイティブな管理環境ながら10分でタイムアウトしてしまうVertex AI Agent Engineや、1時間制限のCloud Run Serviceでは長時間のスキャン要件を満たせないことが分かりました。最終的には、最大1週間の実行が可能なCloud Run Jobsを採用するという判断を経て、ようやくエージェント本体の開発へと進むことができました。 1. 有向二部グラフで構造化する まず、エージェントの思考プロセスを、「タスク」と「発見」という2種類のノードで構成される有向二部グラフとして定義しました。 ToolAgent(実行役) : 割り振られた「タスク」を実行し、その出力結果から脆弱性の手がかりとなる「発見」を抽出します。 PlanningAgent(計画役) : 得られた「発見」を分析し、次に行うべき「タスク」を新しく生成します。 キューやスタックは実行順序を管理するための一時的なものですが、グラフは調査の論理そのものを蓄積するために存在します。木構造ではなくグラフを採用することで、複数の調査経路が同じ結論に辿り着いた際に情報を集約し、後続タスクの重複発生を抑えられるようになりました。さらに、ノードをタスクと発見に分ける二部グラフの制約を設けることで、実行と計画という役割を分離し、事実に基づかない計画を防いでいます。これらを有向の依存関係で結ぶことで、どの発見がどのタスクに起因したのかという因果関係を正確に記録し、診断の履歴を遡れる仕組みを整えました。 この 「タスク → 発見 → タスク」 という再帰的なサイクルをグラフ構造に落とし込むことで、複雑な攻撃ルートを論理的に切り開いていく仕組みを構築しました。 2. 評価関数で枝刈り 無限に広がる探索ルートを制御するため、評価関数を実装しました。 f(n) = h(n) - g(n) h(n) = 目標とする脆弱性の深刻度 g(n) = (1 - 親発見のスコア) * 親の累積コスト + 当タスクのコスト g(n)は、親ノードである「発見」の重要度を反映します。親の重要度が低いほどコストが跳ね上がる設計になっており、計算結果が閾値を下回ったタスクは実行価値なしとして破棄されます。 これにより、重要なルートのみを効率よく辿ることができます。 3. コールバック関数を用いたLLMのトークン管理 ペネトレーションテストの実行は数時間に及ぶことを想定 していたため、情報の取捨選択をしなければ一回のセッションで消費される入力トークン数は数百万規模に膨れ上がり、 コストの増大やコンテキストの上限超過を招くことを確認しました。 この課題を解決するために、 Google ADKのコールバック関数を活用しました。 コールバック関数とは、LLMの推論やツールの実行の前後のタイミングで、あらかじめ指定しておいた独自のプログラムを自動的に実行させる仕組みのことです。 これを用いることで、LLMに送る情報やメタデータをプログラム側で編集できるだけでなく、出力される情報の検証や加工も可能になります。 まずモデルへリクエストを送る直前に、過去のタスクのメッセージ履歴を全て削除する処理を実装し、エージェントが外部の情報に惑わされず、現在の作業のみに集中できる環境を作りました。 また、ツールの呼び出し回数が上限に達した際には、モデルからツール定義自体を剥奪して強制的に結果報告へ移行させる制御も行っています。 出力側では、ツールから返ってくる実行結果を外部ストレージに保存し、LLMに先頭の2000文字のみを渡すことでトークン消費を大幅に抑制しました。 そして、例えば何もツールを使わずに回答しようとしたりした場合には、空のリストを強制的に返却させて実行を正常に終了させるガードレールも設けています。 こうした 入出力管理が、エージェントを実用的なコストと精度で長時間稼働させるための鍵 となりました。 トークン費用の比較 4. フロントエンド フロントエンドは、Pythonでwebアプリを構築できるフレームワークである Streamlit を採用し、スキャン設定管理や進捗確認が可能な管理画面を構築しました。 上記で話したグラフを可視化するため、StreamlitのReactコンポーネントとしてD3.jsと React Flowを組み込み、エージェントの思考が広がっていく様子をリアルタイムで描画できるようにしました。 5. 動作確認 前章で構築したAWS環境に、わざと多数の脆弱性を含ませたセキュリティ学習用のデモアプリである OWASP Juice Shop をデプロイし、エージェントを走らせました。可視化したグラフが広がっていく様子を眺めていて、 インターン期間中に取り組んできたすべてが、この一つの完成したプロジェクトへと繋がっていて 、すごい達成感を感じました。 終わりに インターン開始時に挙げた3つの目標 について、達成できたか振り返ってみます。 日本での生活には、この半年でかなり慣れました。 TQCチームでの業務を通して、**新しいことも存分に学びました。 では、日本のビジネスカルチャーについては…? まあ、上司や同僚との接し方は学びましたが、完璧にこなせたかと言われると微妙かもしれません。でもその代わりに、 レバレジーズのエンジニアは皆自由である という、この会社ならではの最高の文化を肌で感じることができました。 よく他の記事でも語られていることですが、 レバレジーズは本人が望みさえすれば、どこまでも成長できる場所です。 来月からは、いよいよ正社員としてのスタートです。さらに成長し続け、レバレジーズを盛り上げていく僕のこれからの活動に、ぜひ注目してください! それではまたいつか! We are hiring! レバレジーズ株式会社では一緒にサービスを開発してくれる仲間を募集中です。 もしご興味を持っていただけたなら、以下のサイトからご応募ください。 HRMOS求人ページ hrmos.co 会社説明資料 speakerdeck.com


















