TECH PLAY

MNTSQ

MNTSQ の技術ブログ

85

前回のあらすじ スキーマ 分離設計のDB(テナント毎に独立した スキーマ を持つDB)でサービス規模が拡大すると、 スキーマ 数の増加に由来するオーバーヘッドが無視できないものになる 次はパラメータチューニングなどで何とか延命できないか試してみたい tech.mntsq.co.jp はじめに 前回の 負荷試験 によって、弊社サービスは600テナントを超えたあたりから、データベースの急激な性能劣化を起こすリスクが高いことが判明しました。長期的には根本的な構成の見直しを行うとして、パラメータチューニングなどでデッドラインを後ろにずらせるのであれば、それはそれでありがたいです。 よって、今回の 負荷試験 の目的は、 チューニングによって アーキテクチャ 改善をどの程度後ろ倒しにできるかを検討することでした(過去形) 。 結論を申し上げると、弊社のケースではパラメータチューニングによって延命を図れる見込みは薄いことが判明しました。ただし、調査の過程で、前回の結果の解釈の誤りを発見したり、新たな条件で見えてきた スキーマ 分離の定性的な特性を発見したりと、それなりに実りのある結果が得られたので、再び報告させていただきたいと思います。 前回のあらすじ はじめに 追加試験の結果報告 真のボトルネックはwait/io/table/sql/handlerだった パラメータチューニングについて table_definition_cache & table_open_cache innodb_sync_array_size インスタンスサイズを大きくしてみる 一定高負荷下でのスキーマ数によるパフォーマンスの変化 150スキーマで性能が劣化する理由 1200スキーマで性能が劣化する理由 負荷試験を行う上で注意したいこと 本番環境と試験環境の違いを考える 測定の分散について おわりに 追加試験の結果報告 真の ボトルネック は wait/io/table/sql/handler だった wait/io/table/sql/handler は、ストレージエンジン層における行レベルの操作(読み取り、挿入、更新、削除など)に対して、 SQL 層が費やした待機時間を計測するイベントです。一般には物 理I /O、行レベルのロック待ちなどが主原因になります。 前回の結果では wait/synch/mutex/innodb/dict_sys_mutex が ボトルネック になると誤った結論をつけてしまいましたが、これは暖機運転(ウォームアップ)不足による一時的な現象でした。以下は暖機運転完了前後の待機時間の内訳を、PeformanceInsiteの 平均アクティブセッション(AAS) のグラフで確認したものです。 暖機運転完了前後の待機時間の内訳 これを見るとvCPU数を大幅にオーバーして wait/synch/mutex/innodb/dict_sys_mutex の待機イベントが支配的になっている時間(左側: 暖機運転中)と、vCPU数以下の範囲内で wait/io/table/sql/handler が支配的になっている時間(右側: 暖機運転完了)がハッキリと分かれていることが確認できます。 wait/synch/mutex/innodb/dict_sys_mutex は データディクショナリ へのアクセス競合でしたが、必要な メタデータ がすべてメモリに乗り切れば、基本的に競合は発生しません。よって、今回の測定の条件下では、 wait/synch/mutex/innodb/dict_sys_mutex が発生している場合は暖機運転が十分でないと言えます。 また、このような wait/synch/mutex/innodb/dict_sys_mutex の変化は、今回の測定条件内では table_definition_cache (テーブル定義の メタデータ を載せるキャッシュ)などのキャッシュサイズが十分に足りていることを示唆しています。 以降の測定は暖機運転を十分に行い、この待機イベントの変化を確認してから本測定を行いました。 なお、今回の検証では最終的にこの wait/io/table/sql/handler の ボトルネック を解消することはできませんでしたが、高並列・高QPSの負荷環境においては、ストレージエンジンが膨大なリクエストを処理する上で避けられない現象であると解釈しています。wait/io/table/ sql /handler`の待機時間の内訳を、クエリの種別ごとに分けたものが以下の画像です。 負荷を構成しているクエリの比率がそのまま待機時間の比率になっていました 。したがって、特定のスロークエリがこれ発生させているわけではないことがわかります。 `wait/io/table/ sql /handler`のクエリ種別ごとの内訳 パラメータチューニングについて チューニング( インスタンス サイズなどの変更も含む)を試みた項目について、端的に結果をまとめていきます。結論、 今回の測定条件下では 、パフォーマンスの改善はほとんどみられませんでした。 table_definition_cache & table_open_cache table_definition_cache は、テーブルの定義情報( メタデータ )を保持するグローバルなキャッシュです。 データディクショナリ にアクセスする代わりにこのキャッシュを参照することで、テーブルの定義参照処理を高速化することができます。 table_open_cache は、各スレッドがテーブルにアクセスする際に使用するオブジェクトを保持するキャッシュです。テーブルを物理的にオープンする処理は CPU コストが高いため、このキャッシュに保存されたオブジェクトを再利用することで、オーバーヘッドを劇的に低減できます。 前回の調査結果から、 メタデータ へのアクセス待機が ボトルネック であると予想していたため、キャッシュサイズを大きくすることで改善が見込めると思い、これらのパラメータについて調査を行いました。 チューニングが有効でなかった理由は、先ほどの暖機運転の議論でも述べた通り、今回の測定においてはキャッシュサイズは最初から十分足りていたためです。キャッシュヒット率も確認しましたが、99.555%とほとんどキャッシュミスは発生していませんでした。前回も述べたとおり、負荷は十数種類のパターンのクエリで作っており、参照テーブルも数種類にとどまります。そのため、 メモリの使い方は本番環境の方が圧倒的にハードであり、今回の測定ではメモリへの負荷や改善策を評価することができません。 もしも本番環境で 頻繁にキャッシュミスが発生していた場合には、これらのパラメータのチューニングはパフォーマンスを改善する上で非常に有効 になります。 innodb_sync_array_size innodb_sync_array_size は、内部同期用の配列サイズを調整するパラメータで、CPUコア数の多い環境で値を大きくすると、複数のスレッドが内部ラッチを取り合う際の競合 wait/synch/mutex/innodb/sync_array_mutex を緩和し、スケーラビリティを向上させることができます。 このパラメータを増やすと、待機中スレッドの多いワークロードの同時実行性が高まるというのが通説なので、調査を行いました。 チューニングが有効でなかった理由はシンプルで、 wait/synch/mutex/innodb/sync_array_mutex が発生していなかったので調整する必要がなかったというものです。 インスタンス サイズを大きくしてみる これまでの測定では2xlargeを使用していましたが、これを4xlargeにスケールアップした際に性能が改善するかを調査しました。クライアント側は24並列・各スレッドは500QPSの設定で負荷をかけました。以下は測定結果の一部です。 インスタンス TotalQPS QueryCount P99[μs] Avg [μs] r8g.2xlarge 8450 15,210,198 4.44E+03 2.36E+03 r8g.4xlarge 9233 16,618,969 3.15E+03 2.06E+03 4xlargeの方が全体的に性能が改善しているのがわかります。しかし、2xlarge -> 4xlargeではCPU、メモリ共に2倍になっているにも関わらず、性能の改善はQPS換算でせいぜい10%程度でした。これは費用対効果としては非常に効率が悪いです。 この理由は、CPUを使用しているセッションの内訳で説明できます。 2xlarge 4xlarge 両方とも主要な待機は wait/io/table/sql/handler です。4xlargeの方はCPUが2倍になっているため縦軸の縮尺が異なりますが、その絶対値はほとんど変わっていないことがわかります。2xlarge時点でCPUは ボトルネック ではなかった(CPU数を超えたアクティブセッションが発生していなかった)ため、4xlargeに変更してCPUが増えても、目に見えた性能改善はできなかったと解釈できます。逆に、このグラフでCPU数を超えて待機しているセッションが多くなった場合は、CPUの数を増やすことで大きな性能の改善が期待できます。 したがって、 インスタンス のスペックアップは、必ずしも限界を迎えた際の応急処置として使えるとは限らない と言えます。 一定高負荷下での スキーマ 数によるパフォーマンスの変化 Aurora MySQL のパフォーマンスと スキーマ 数の関係をより掘り下げて調べてみました。前回とは以下の点を変更し、150 ~ 1200 スキーマ にて再測定を行いました。 クライアントの負荷設定を固定する(24並列 * 500QPS = 12000TotalQPS) 十分な暖機運転を行いキャッシュに乗り切ったことを確認してから本測定を行う 測定結果を以下にまとめます。かなり 直感とは異なる結果 になりました。 高負荷で固定した際のパフォーマンスの変化 なんと、600 スキーマ が最も スループット が高く、 スキーマ 数が増えた時だけではなく、少なくなった時にも性能の劣化が見て取れます。 そして150, 600, 1200 スキーマ での測定時のAASのグラフは以下のようになっていました。色は異なりますが、支配的となっているのはすべて wait/io/table/sql/handler です。 150 スキーマ 600 スキーマ 1200 スキーマ 600 スキーマ での測定で最も平均AASが少なくなっています。また、1200 スキーマ に加えて、150 スキーマ での測定でも wait/io/table/sql/handler の待機時間が増加しています。 150 スキーマ で性能が劣化する理由 150 スキーマ 構成では、600 スキーマ 構成と比較してテーブルあたりのアクセス密度が4倍になります。たとえ スキーマ が分かれていても、 InnoDB 内部の管理用ハッシュテーブルやラッチ( 排他制御 )は インスタンス 全体で共有、あるいは少数の パーティション で管理されています。 150 スキーマ では管理対象が少ない分、 特定の管理 パーティション にアクセスが集中する「 ホットスポット 」 が発生しやすく、内部的な順番待ちが頻発します。この微細な足止めが積み重なり、結果としてストレージエンジン層の処理時間である wait/io/table/sql/handler を引き延ばしていると考えられます。満遍なく一定の遅延が発生していることも、この説を裏付けます。 150 スキーマ 1200 スキーマ で性能が劣化する理由 一方、1200 スキーマ 構成での劣化は、150 スキーマ の時とは逆に 「管理対象の膨大さ」によるオーバーヘッド が原因であると考えられます。 今回の測定ではキャッシュヒット率が99.5%を超えており、ディスクI/Oの影響は無視できます。しかし、これほど大量の スキーマ が存在すると、 InnoDB が高速化のために生成する「 アダプティブ ハッシュインデックス(AHI)」などの管理データ自体が巨大化します。この巨大化したデータの中から目的のデータを探し出す際、たとえメモリ上であっても管理 パーティション の奪い合いが発生します。この様子は wait/synch/sxlock/innodb/hash_table_locks の待機時間(茶色)として現れており、これに比例して wait/io/table/sql/handler が増加していることがわかります。 つまり、分散のメリットよりも、巨大なリソースを管理・検索するコストが上回ってしまった状態と言えます。 1200 スキーマ 逆に言えば、適切な範囲内であれば スキーマ を分離して負荷を分散すること(パーティショニング)によってパフォーマンスが向上するケースもあると言えます。(これは前回時点では認識していなかった スキーマ 分離のメリットですね) なお、今回の測定はクエリの種類とテーブルを絞り、メモリ負荷が スキーマ 数に対して非常に少なくなるような条件で行ってます。実際に大量 スキーマ のテーブルに対して満遍なくアクセスが発生する場合は、 wait/synch/mutex/innodb/dict_sys_mutex ( データディクショナリ へのアクセス競合)などが支配的になることも十分に考えられます。 負荷試験 を行う上で注意したいこと 負荷試験 において最も注意しなければならないのは、 「得られた数値を絶対視し、誤った解釈をしてしまうこと」 です。今回の測定を通して見えてきた、試験設計と結果評価における注意点をまとめます。 本番環境と試験環境の違いを考える 前回、今回の 負荷試験 は、 上位のものとはいえ、使用するクエリやアクセスするテーブルを絞った環境にて行ったものになります。また、アクセスの並列数も24並列と実際のワークロードに比べると少ないものであり、1スレッドあたりの負荷を上げて総量を補っているとはいえ、本番環境を再現しているとは言い難いです。 よって、今回は以下のような点に注意して結果を評価しています。 「キャッシュ不足」は評価できても、「キャッシュ十分」とは評価できない 負荷試験 はあくまでよく使われている一部のクエリ、テーブルのみをサンプリングしている。使用されないテーブルはキャッシュに乗らない 結果は鵜呑みにするのではなく、定性的に再解釈する 例えば負荷の総量が同じでも、1並列でかける負荷と100並列でかける場合ではDB内部の 排他制御 は全く異なるため、結果の数値をそのまま受け取ってはいけない。数値をそのまま本番のキャパシティとして解釈するのではなく、振る舞いの定性的な変化を読み取ることに主眼を置くべき 負荷試験 には、 条件設定によって評価できるものと評価できないものがある ということです。また、数値をそのままの状態で受け取ることもミ スリード を生む危険があります。このような点に注意して試験設計、結果評価を行いましょう。 測定の分散について 以下は全く同じ負荷設定で、 数時間以内に 測定した結果を比較したものです。サンプルは少ないですが、QPS換算で1%程度の分散におさまっています。 TotalQPS QueryCount P99[μs] Avg [μs] 8554 15,398,029 4.73E+03 2.34E+03 8459 15, 226 ,641 4.49E+03 2.37E+03 8460 15,228,216 4.32E+03 2.36E+03 対してこちらは全く同じ負荷設定で、 日付を跨いで 測定した結果を比較したものです。 TotalQPS QueryCount P99[μs] Avg [μs] 9118 16,412,667 4.60E+03 2.09E+03 8450 15,210,198 4.44E+03 2.36E+03 なんと、QPS換算で 7.3%程度の差 が出てしまっています。マネージドサービスゆえの不可避な外部要因( AWS の帯域や近隣 インスタンス の負荷)によるものだと思っていますが、この結果は、測定そのものの分散よりも時間帯による分散の方が遥かに大きなことを示唆しており、 最低でも7%以上の分散 があることがわかります。 つまり、 この測定では「5%程度の性能改善」を論じても意味がない わけです。その数値は誤差の範囲に埋もれてしまうからです。結果を数値的に求めたい場合は、測定自体の誤差がどの程度なのかを評価する必要があり、それができない限りは 定量 的な評価は難しいということです。間違っても、1回だけ測定して「こんな数値が取れました!」という結果の受け取り方はしないようにしましょう。 おわりに 前回記事の内容と合わせて本格的な 負荷試験 を行い、当初の目的であった現行 アーキテクチャ のデッドラインを見積もるという目的は無事達成できました。しかし、目的達成以上に、その試行錯誤の過程から多くのことを学ぶことができたと思います。 「 スキーマ 数が増えれば管理コストで遅くなる」という一般論は知っていても、実際に手を動かして測定を行い、予想と異なる振る舞いに悩み、考え抜いたことで、より MySQL に関する理解は深まりました。そして、「推論と検証を繰り返すことで ブラックボックス を一つずつ明らかにしていく」というプロセスそのものが、エンジニアにとって何よりも大切な経験であり、自信にもつながることを再確認できました。 本記事の試行錯誤の過程が、これから 負荷試験 に挑む皆様にとって、一歩を踏み出すための参考になれば幸いです。 MNTSQ株式会社 SRE 西室
アバター
はじめに 課題感 スプリットビュー DNS サンプルコード おわりに 参考 はじめに 小ネタです。記事タイトルが長いのですが、これは本稿の内容を1行で説明したものになります。 そして OpenSearch に限らない一般的な話題としては repost.aws という優れた情報が既にあります。本稿では Terraform コードの例示や背景についての解説を与えることで、付加価値を与えんとするものになります。 課題感 OpenSearch を AWS 上のマネージドサービスとして扱う場合、 OpenSearch ドメイン を外部からのアクセスが可能なものとして構築するか、 VPC 内に閉じたものとして構築するかの2択があります *1 。後者を選ぶ場合 OpenSearch ドメイン へのアクセスは対象の VPC 内からのみアクセスが可能になります また OpenSearch ドメイン には作成後標準で払い出されるエンドポイントとは別にカスタムエンドポイントを設定することができます。これは apex を含む任意 ドメイン を利用者の所望のものにすることができるものになります *2 さて OpenSearch は通常 HTTPS による通信が強制されており、これは標準のエンドポイント / カスタムエンドポイント 両方で該当します。標準のエンドポイントについては AWS がよしなに SSL 証明書を設定してくれますが、カスタムエンドポイントについてはユーザ側で SSL 証明書を用意しなくてはなりません。 手軽にこのあたりをやるには ACM で SSL 証明書を払い出そうという話になり、より手軽にやろうとすればカスタムエンドポイント設定値としての ドメイン の存在確認を DNS 検証でやる *3 という段取りになるでしょう。 ただし ACM による DNS 検証は インターネット経由で名前解決が可能な DNS レコード に対してのみ対応可能です。 VPC に閉じる構成とした OpenSearch ドメイン に対し設定するカスタムエンドポイントで使う ドメイン をインターネットから名前解決可能とするメリットは無いでしょうから、これも VPC 内からのみ名前解決が可能な Route 53 の private hosted zone で設定することになります *4 。 つまり カスタムエンドポイントとして使用する ドメイン はインターネットから名前解決不可 になり、これで何が困るかというと ACM による DNS 検証が不可 となります。 いままでの議論をまとめると以下のようになります。 課題感 スプリットビュー DNS これを解決するには本稿題名に掲げたスプリットビュー DNS *5 が有効でしょう。 AWS においては Route 53 に関するドキュメントのうち こちら にあるとおり、Route 53 において private / public の両 hosted zone を組み合わせることで実現が可能です。すなわち example.com という public な hosted zone を Route 53 に追加する internal. example.com という private な hosted zone を Route 53 に追加する これだけです。 opensearch.internal.example.com という ドメイン を VPC 内に閉じた OpenSearch ドメイン のカスタムエンドポイントとして使用することを考えると opensearch というレコードで OpenSearch エンドポイントを向く CNAME レコードを private 側の Route 53 hosted zone( internal.example.com に対応)へ設定 opensearch.internal.example.com という ドメイン に対し ACM 上で SSL 証明書発行 2. の結果得られた DNS 検証用レコードを public 側の Route 53 hosted zone( opensearch.internal.example.com に対応)へ設定 ここまでに得られた ACM 上 SSL 証明書の ARN と ドメイン 名とを OpenSearch へカスタムエンドポイントとして設定 という手筈を踏めば、 ACM で DNS 検証が可能な OpenSearch カスタム ドメイン 向けの SSL 証明書が手に入り、かつ VPC 内に限って OpenSearch ドメイン にてカスタム ドメイン を使っての通信が可能、という状態にもってゆくことができます *6 。 opensearch .internal. example.com な OpenSearch カスタムエンドポイントを使う図 サンプルコード 上記構想を AWS マネジメントコンソールでやるのは少々骨です。実際に使用している Terraform コードに適宜修正を加えたサンプルコードを例示します variable "hostzone" { type = object ( { external = string internal = string } ) default = { external = "example.com" internal = "internal.example.com" } } variable "custom_endpoint" { type = string nullable = true default = "opensearch.internal.example.com" } resource "aws_vpc" "main" { # 詳細略 } /* Route 53 関連 各 hosted zone 定義と OpenSearch カスタムエンドポイントとして使うドメインに対応する DNS レコードの設定を行う */ resource "aws_route53_zone" "external" { name = var.hostzone.external } resource "aws_route53_zone" "internal_split_dns" { name = var.hostzone.internal vpc { vpc_id = aws_vpc.main.id } } resource "aws_route53_record" "opensearch_custom_endpoint" { # 本稿の例では常時 true だが var.custom_endpoint が無い場合はレコード作成不要なので条件分岐できるようにしておく for_each = ( var.custom_endpoint != null ? { "main" = {} } : {} ) zone_id = aws_route53_zone.internal_split_dns.zone_id /* var.custom_endpoint の apex ドメインは aws_route53_zone.internal_split_dns で指される Route 53 ゾーンに一致する必要がある この理屈から当該 apex zone に設定すべき OpenSearch ドメイン用 DNS レコードの値は var.custom_endpoint から apex zone 名を差っ引けば得られる */ name = replace (var.custom_endpoint, aws_route53_zone.internal_split_dns.name, "" ) type = "CNAME" ttl = 60 records = [ aws_opensearch_domain.main.endpoint, ] } /* ここから ACM 関連 証明書払い出しと DNS 検証のための各種設定までを行う */ resource "aws_acm_certificate" "internal_split_dns" { domain_name = var.custom_endpoint validation_method = "DNS" } resource "aws_route53_record" "internal_split_dns" { for_each = { for dvo in aws_acm_certificate.internal_split_dns.domain_validation_options : dvo.domain_name => { name = dvo.resource_record_name record = dvo.resource_record_value type = dvo.resource_record_type } } zone_id = aws_route53_zone.external.id allow_overwrite = true name = each.value.name records = [ each.value.record ] ttl = 600 # 適当に変更可 type = each.value.type } resource "aws_acm_certificate_validation" "internal_split_dns" { certificate_arn = aws_acm_certificate.internal_split_dns.arn validation_record_fqdns = [ for record in aws_route53_record.internal_split_dns : record.fqdn ] timeouts { create = "15m" } } resource "aws_opensearch_domain" "main" { /* カスタムエンドポイント関連以外の詳細は省略 実際に OpenSearch ドメインを構築するには他にも必須項目がある See: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/opensearch_domain */ domain_endpoint_options { enforce_https = true custom_endpoint_enabled = var.custom_endpoint == null ? false : true custom_endpoint = var.custom_endpoint custom_endpoint_certificate_arn = aws_acm_certificate.internal_split_dns.arn } encrypt_at_rest { enabled = true } node_to_node_encryption { enabled = true } } おわりに 簡単ではありますが OpenSearch 構成の tips について解説する項目となりました。カスタムエンドポイントを使わない場合、 OpenSearch のデフォルトでは https://search-<ドメイン名>-<ランダム文字列>.<リージョン>.es.amazonaws.com のようになり *7 、かつ VPC 内に閉じる場合ではより長く https://vpc-search-<ドメイン名>-<ランダム文字列>.<リージョン>.es.amazonaws.com となります。これが本稿の例でいえば https://opensearch.internal.example.com として扱えるようになり、シンプルなエンドポイントで OpenSearch を使うことが可能になります。 用途その他の事情で OpenSearch デフォルトのものではないエンドポイントを設定したくなり、またその対象となる OpenSearch ドメイン が VPC 内からのみ利用可能なものであった場合に、本稿の内容がお役に立てば幸いです。だいぶニッチな話題ではありますが、そういった需要は決してゼロではないと考えています。 MNTSQ 株式会社 SRE 秋本 参考 文中で示した引用以外のものとして、本稿の話題に取り組む前の調査時に 【Route53】スプリットビューDNSの名前解決順序を整理してみた | DevelopersIO を拝読しました。 *1 : https://docs.aws.amazon.com/opensearch-service/latest/developerguide/vpc.html *2 : https://docs.aws.amazon.com/opensearch-service/latest/developerguide/customendpoint.html *3 : https://docs.aws.amazon.com/acm/latest/userguide/dns-validation.html *4 : OpenSearch / ACM / VPC とお膳立てを AWS 前提として進めてきたので、いきなり DNS 関係も Route 53 を使うこととしてもさして矛盾はしないと思っています *5 : 本稿を書く上でこの定義が気になったので調べたところ https://datatracker.ietf.org/doc/html/rfc8499 や https://datatracker.ietf.org/doc/html/rfc6950/ が有用そうでした。本稿を読む上でこの内容を理解する必要は全くありません。これはほぼ私的なメモです *6 : 直下の図中 xN の表記は https://docs.aws.amazon.com/acm/latest/userguide/dns-validation.html に拠ります *7 : https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html
アバター
はじめに 弊社では AWS を主軸としたインフラ構成をもってプロダクトを展開していますが、一部では AWS 以外にも Azure および Google Cloud も活用しています。それぞれの棲み分けは以下のようなものになります。 AWS :ほとんど全て コンピュート / ネットワーク / ストレージ / DB / セキュリティ etc. Azure: OCR 関連 Google Cloud:LLM を使う処理および OCR 関連 利用規模としては AWS >> Google Cloud > Azure といったものになり、結果としてこれら クラウド ベンダーの利用にかかるコスト(利用料金の意。以下本稿では「コスト」を料金を指す意味でのみ用います)割合も AWS が支配的なものになります *1 。 これら クラウド ベンダーを扱う上でネックになることといえば勿論コストです。各ベンダーそれぞれコスト確認のための優れたツールを提供していますが、無論各ツールはそれぞれのベンダー内に閉じるものであり、例えば「今月 Google Cloud コストが普段と比べて妙な増え方をしているが AWS 側でも変動している形跡は無いだろうか?」といった調査をする場合、 AWS と Google Cloud とで行きつ戻りつしながらコスト調査を行う必要が出て来ます。 このあたりへの対処として、弊社では Datadog にコスト情報を集約する という選択肢をとることにしました。この内容について解説します。 構成 概要を以下に示します。 図の複雑さの違いが示すとおり、Datadog にコスト情報を連携させる戦略は各 クラウド ベンダーによって異なります。やりかたは一通りではないという前置きをしつつ、弊社では以下のような手法をとっています。 AWS :consolidated billing アカウントで Cost & Usage Report (CUR) を生成し、Datadog で CUR の内容を収集する AWS Organizations 管理下にある全 AWS アカウントのコスト情報を Datadog に連携する 弊社では例外なく全 AWS アカウントが AWS Organizations 管理下にあるので、個別に連携を頑張るよりも合理的 弊社では AWS Organizations 管理アカウントが consolidated billing アカウントを兼ねており、 適切な権限制御をする前提において 管理アカウントと Datadog とを連携するのみで事足りる Azure:コストを追いたい サブスクリプション に対し cost export を設定し、その結果を Datadog で収集する Google Cloud:Datadog 連携用のプロジェクトでコスト情報を BigQuery + Cloud Storage で扱えるようにした上でその内容を Datadog で収集する プロダクト向けのワークロードを稼動させているプロジェクトは同一の請求アカウントに紐付けるような構成としているので、追跡したいコスト *2 としてはこの請求アカウントに関するもの Datadog 連携用のプロジェクトもこの請求アカウントに紐付け(後述)、必要なコスト情報を単一プロジェクトに集約する格好とした さも最初から設計したかのように書いていますが、実際には殆どの要素を Datadog が提供するドキュメントに従ってのものになります。以後本稿でも基本的にはこの情報を前提にします。 https://docs.datadoghq.com/cloud_cost_management/setup/aws/ https://docs.datadoghq.com/cloud_cost_management/setup/azure/ https://docs.datadoghq.com/cloud_cost_management/setup/google_cloud/ なお Azure のみ単一の サブスクリプション を対象としていますが、これは以下背景によります。弊社事情である、というのが要旨です。 本番ワークロードを処理する サブスクリプション が Azure コストの支配的な要素を占めており、それ以外の サブスクリプション のコストを追う動機が薄い AWS / Google Cloud とは異なり Azure はプロダクト用途以外にもコーポレート用途の要素があり、SRE で管理できていない部分がある 具体的には管理グループ単位でのコスト監視設定を行うのが難しかった プロダクト向けのワークロードを考える範囲においては 1. の理由で特定の サブスクリプション を追跡できれば充分という背景があった 設定 基本的には前述の Datadog が提供する設定手順に従えばスムーズに設定できます。とくにコスト関連のお膳立てが整っていれば Datadog との連携およびコスト関連の情報取得はかなりスムーズに設定できます。 一方でコスト情報を適切に出力する為の作業で一部難儀したことがありました。これは作業者が AWS の経験に寄り過ぎていることも一因ですが、ひょっとしたら同じようなハマり方をする方がいるかもしれません。恥をしのんで解説します。 なお以下では実際に筆者が作業した過程をなぞるかたちでドキュメントベースで解説しますが、実際には Cloud Cost のアカウント設定画面 からウィザードに従って作業をすすめるほうが直感的だったりします。また文中で示すリンクテキストのリンク先は特記のない限り Datadog 公式ドキュメントです。 AWS 作業対象(前掲概要図から抜粋) Datadog の公式ドキュメントは こちら になります。やることはおおまかに Datadog と AWS アカウントとの連携設定 AWS アカウント側での Cost & Usage Report (CUR) の設定 Datadog 連携用 IAM ロールで 2. の CUR が参照できるよう権限の設定 Datadog 側での Cloud Cost 設定 の4点です。 ドキュメントでは CloudFormation を使う方法と手作業 (manual) で進める方法の2択がありますが、弊社では既存設定を Terraform で IaC 化しているためにて本作業も Terraform で進めるべく、手作業での設定としました。この場合に参考にすべき手順は こちら になります。 1. と 4. は上掲手順に従うのみで大丈夫そうでした。 2. と 3. *3 を Terraform で設定する場合のサンプルコードは以下のようになります。 variable "datadog_external_id" { type = string description = "External ID provided by Datadog on AWS integration" default = "" } data "aws_caller_identity" "current" {} data "aws_iam_policy_document" "datadog_integration" { statement { actions = [ "s3:PutObject" , "s3:GetBucketPolicy" , ] resources = [ aws_s3_bucket.datadog_integration.arn, "$ { aws_s3_bucket.datadog_integration.arn } /*" , ] principals { type = "Service" identifiers = [ "billingreports.amazonaws.com" , "bcm-data-exports.amazonaws.com" , ] } condition { test = "StringLike" variable = "aws:SourceArn" values = [ "arn:aws:cur:us-east-1:$ { data.aws_caller_identity.current.account_id } :definition/*" , "arn:aws:bcm-data-exports:us-east-1:$ { data.aws_caller_identity.current.account_id } :export/*" , ] } condition { test = "StringLike" variable = "aws:SourceAccount" values = [ data.aws_caller_identity.current.account_id, ] } } } resource "aws_s3_bucket" "datadog_integration" { bucket = "mntsq-master-datadog-integration" } resource "aws_s3_bucket_policy" "datadog_integration" { bucket = aws_s3_bucket.datadog_integration.id policy = data.aws_iam_policy_document.datadog_integration.json } /* Datadog の Cloud Cost では CUR 2.0 に対応していない legacy CUR の場合は以下リソースを使って定義する */ resource "aws_cur_report_definition" "datadog_integration" { report_name = "datadog-integration" time_unit = "HOURLY" format = "Parquet" compression = "Parquet" additional_schema_elements = [ "RESOURCES" , "SPLIT_COST_ALLOCATION_DATA" , ] s3_bucket = aws_s3_bucket.datadog_integration.bucket s3_prefix = "cost_and_usage_report" s3_region = aws_s3_bucket.datadog_integration.region } data "aws_iam_policy_document" "datadog_aws_integration_assume_role" { statement { actions = [ "sts:AssumeRole" ] principals { type = "AWS" identifiers = [ "arn:aws:iam::464622532012:root" ] # See: https://docs.datadoghq.com/integrations/guide/aws-manual-setup/?tab=roledelegation } condition { test = "StringEquals" variable = "sts:ExternalId" values = [ var.datadog_external_id, ] } } } data "aws_iam_policy_document" "datadog_aws_integration" { statement { actions = [ /* Datadog と AWS との連携 (integration) で何を連携するか(= Datadog にどのような AWS の情報を流すか)によって変わる 必要に応じて定義する。今回は詳細省略 */ ] effect = "Allow" resources = [ "*" ] } } data "aws_iam_policy_document" "cost_and_usage_report_export" { statement { effect = "Allow" actions = [ "s3:ListBucket" ] resources = [ aws_s3_bucket.datadog_integration.arn, ] } statement { effect = "Allow" actions = [ "s3:GetObject" ] resources = [ "$ { aws_s3_bucket.datadog_integration.arn } /cost_and_usage_report/*" , ] } statement { effect = "Allow" actions = [ "ce:Get*" ] resources = [ "*" ] } statement { effect = "Allow" actions = [ "cur:DescribeReportDefinitions" ] resources = [ "*" ] } statement { sid = "DDCloudCostListOrganizations" effect = "Allow" actions = [ "organizations:Describe*" , "organizations:List*" ] resources = [ "*" ] } } resource "aws_iam_policy" "datadog_aws_integration" { name = "DatadogAWSIntegrationPolicy" description = "Allow actions for Datadog Integration" policy = data.aws_iam_policy_document.datadog_aws_integration.json } resource "aws_iam_policy" "datadog_cost_and_usage_report_export" { name = "DatadogCurExportPolicy" description = "Allow actions required to export Cost and Usage Report to Datadog" policy = data.aws_iam_policy_document.cost_and_usage_report_export.json } resource "aws_iam_role" "datadog_aws_integration" { name = "DatadogAWSIntegrationRole" description = "Role to integrate AWS and Datadog" assume_role_policy = data.aws_iam_policy_document.datadog_aws_integration_assume_role.json } resource "aws_iam_role_policy_attachment" "datadog_aws_integration" { role = aws_iam_role.datadog_aws_integration.name policy_arn = aws_iam_policy.datadog_aws_integration.arn } # AWS マネージドポリシ。Datadog が Resource Collection 時に要求する data "aws_iam_policy" "security_audit" { name = "SecurityAudit" } resource "aws_iam_role_policy_attachment" "datadog_aws_integration_security_audit" { role = aws_iam_role.datadog_aws_integration.name policy_arn = data.aws_iam_policy.security_audit.arn } resource "aws_iam_role_policy_attachment" "datadog_aws_integration_cost_and_usage_report" { role = aws_iam_role.datadog_aws_integration.name policy_arn = aws_iam_policy.datadog_cost_and_usage_report_export.arn } Azure 作業対象(前掲概要図から抜粋) Datadog の公式ドキュメントは こちら になります。やることはおおまかに Datadog と Azure サブスクリプション との連携設定 Azure サブスクリプション での cost export 設定 cost export 結果を収容する storage container を Datadog 連携用 の app registration が参照できるよう設定 Datadog 側での Cloud Cost 設定 の4点です。 Azure については IaC 管理できていない範囲 *4 につき、手作業で設定をすすめてゆく必要がありました。なお本対応をおこなうまで Azure と Datadog とを組み合わせて利用するケースは弊社内において存在せず、その段階からの作業が必要となった点、補記しておきます。 1. については 手順 を通読することになります。弊社では "Quickstart (recommended)" によって作業をすすめました。 続く 2. ですが、 手順 に従い作業をする前に storage account blob container を用意しておく必要があります。Datadog と連携した Azure サブスクリプション 内に storage account を作り、その中に blob container を設ける格好です。cost export の結果は最終的に blob container 内に格納されます。上述二点が用意できたら前掲手順に従い cost export を設定します。いくつかフォーマットを設定できますが、弊社では AWS の CUR に合わせて Parquet を選定しました。 3. については 手順 に従い素直に設定すれば大丈夫です。Azure の世界観ではタブは 画面左側 にあるので、手順内で "tab" と説明されるものは画面左側にありますので注意が必要です *5 。 4. については手順通りで割合簡単に設定が可能なので詳細は省略します。 Google Cloud 作業対象(前掲概要図から抜粋) Datadog の公式ドキュメントは こちら です。以下5点が必要な作業となりました。本対応をおこなうまで Google Cloud と Datadog とを組み合わせて利用するケースは弊社内において存在せず、その段階からの作業が必要となった点、補記しておきます。 Datadog 連携用のプロジェクトを用意し、Datadog との連携設定を実施 cost export 設定を対象プロジェクト内の BigQuery から取り扱う設定を追加 用意したプロジェクトに対し請求アカウントからの cost export 設定を追加 BigQuery 出力先となる Cloud Storage バケット を対象プロジェクト内に追加 Datadog 用 service account から 4. で設定した Cloud Storage バケット が参照できるよう権限を設定 Datadog 側での Cloud Cost 設定 Google Cloud についても IaC 管理できていない範囲 *6 につき、手作業で設定をおこないます。 1. については 手順 を通読し必要な作業を行います。どのように Google Cloud プロジェクトを設計するかは個々のケースによって様々な方法論があると思いますが、弊社では素直に Datadog と連携するためのプロジェクトを新設する方向にしました。この際プロジェクトは適当な請求アカウントに紐付けないと Datadog との連携設定でエラーになるため、事前に請求アカウント向けの設定を Google Cloud 側の手順 などを参照して済ませておく必要があります。 2. については こちら が手順となりますが、ここでは既に BigQuery デー タセット が存在していることが前提になります。利用するプロジェクト内で事前に用意しておき、その上で 3. に臨みます。cost export 設定を 請求アカウントから 実施する必要があります。 4. と 5. は 手順 に従い素直に作業します。ここで手順内の "co-located" は BigQuery デー タセット と Cloud Storage バケット とでリージョン設定を同一にせよという意味になる *7 ので、そのように設定します。 6. においては手順に従えば作業は簡単にできます。以下が必要になるので適宜参照できるよう準備して設定に臨みます。 請求アカウント ID プロジェクト ID cost export 用の BigQuery デー タセット 名 BigQuery デー タセット 出力先の Cloud Storage バケット 名 なお BigQuery でコスト情報が取り扱えるようになる前に Cloud Cost 設定に臨むとエラーになります。おおむね数時間程度 *8 待つと BigQuery でコスト情報が取り扱えるようになるので、それ以降で設定を試みるとよいでしょう。 結果 全ての設定が済むと Cloud Cost 概要画面 で設定した クラウド ベンダー横断の状況が観察できるようになります。backfill 的な作業をしない限りは基本的に Cloud Cost 利用開始時点のコスト情報が連携されるので、これが真価を発揮するのは充分にコスト情報が蓄積されてからとなるでしょう。 Cloud Cost そのものの機能を使いこなすには弊社としてもまだ至っていません。有効にしてから日が浅く、まだ充分なデータが溜まっていないという側面があります。 一方で Datadog メトリックとして各 クラウド ベンダのコスト情報を適切な分解能でもって Datadog 内で扱える というメリットは Cloud Cost そのものの機能を差っ引いても享受でき、弊社ではひとまず AWS / Azure / Google Cloud 全体のコスト確認用 上記のうち主要なコストファクターとして扱われるアカウント / プロジェクトに絞ったコスト確認用 プロダクト環境単位でアカウント / プロジェクト / サブスクリプション を整理した上で「環境」を横断してのコスト確認用 といった区分で ダッシュ ボードをつくり、定期的に傾向をみる、といった運用をとっています。 3. が少々解り辛いのですが、要するに本番環境として扱われるお客様のワークロードを実際に捌いている AWS アカウント / Google Cloud プロジェクト / Azure サブスクリプション のコストを俯瞰して観察するというものです。 おわりに Datadog の Cloud Cost を使用し、社内で利用している各 クラウド ベンダーのコストを集約して観察する方法について取り扱いました。費用変動の痕跡をつぶさに追う上で参照すべき情報が散逸している状況は多くの苦労を伴いますが、ひとつの場所を確認しておけば間に合う、という状況は中々の 心理的 安全性が見込めます。Cloud Cost は SaaS コストや Datadog 自身のコストも対象に含めることが出来る *9 ので、さらなる観測範囲の拡張をおこない精度の高いコスト監視体制を組むことも可能そうです。弊社でも Cloud Cost の利用は開始したばかりといった状況なので、まずはデータの蓄積をしつつ、よりよい使い所を検討できればと考えています。 マルチ クラウド 構成におけるコスト監視に課題感をお持ちの方の対応検討の一助になれば幸いです。 MNTSQ 株式会社 SRE 秋本 *1 : 詳細な数字は割愛しますが AWS の利用コストは Google Cloud / Azure に比べ桁が1つ異なります *2 : もちろん「プロダクト向けのワークロード」を稼動させているプロジェクトが対象です。というのも弊社ではこれ以外にもプロジェクトが様々な用途で存在し、それらプロジェクトは別の請求アカウントに紐付きます。これら全てのコスト情報を集約させると話が大きくなってしまうので、今回はワークロード稼動環境に絞るものとしました *3 : IAM ロール / ポリシ関連のコードがあることから推測できるとおり、実際には 1. に対応する範囲も含んでいます *4 : Terraform でやりたい *5 : 本稿筆者はこれでけっこうハマりました。実は CUI が主な生活空間なので、 GUI 操作には苦手意識があります *6 : Terraform でやりたい2 *7 : 恥ずかしながらドキュメントから誘導される https://docs.cloud.google.com/bigquery/docs/exporting-data#data-locations を読んでも勝手が解らず少々ハマりました。英語的なニュアンスが汲み取れればよかったのですが…… *8 : だいたい Google Cloud 側で設定完了してから 4 -- 5 時間といったところでした。状況による変動がかなりあると推測されるので、ひとつの参考事例とご理解ください *9 : https://docs.datadoghq.com/cloud_cost_management/
アバター
はじめに ソフトウェアエンジニアの森山です。 CloudFront で API と静的ファイルを別オリジンで扱い Single Page Application(以下 SPA)を ホスティング する構成について解説します。 プライベート API を遮断する設定を追加する際に CloudFront のカスタムエラーレスポンスで躓きました。しかし CloudFront Functions を活用することで上手く切り抜けることができました。 構成 リク エス トのパスを元にCloudFrontでオリジンへのアクセスを振り分けています。 S3でSPAのアセットを ホスティング し、ALB経由でECSでバックエンドの API へ疎通しています。 ハマりポイント /api/private/* へのリク エス トのみALBで遮断する設定を施しました。しかし、何度 API コールしても ステータスコード 200が返却され遮断できていないように見えました。試しにWAFで遮断する設定をしても結果は同じく ステータスコード 200が返却されました。 AWS の設定反映が遅延していることを疑いましたがWAFには /api/private/* のリク エス トはブロックしている アクセスログ が記録されていました。 原因 原因は以下のCloudFrontのカスタムエラーレスポンスでした。 上記はオリジンからの ステータスコード 403,404のレスポンスを ステータスコード 200に上書きしindex.htmlを返却します。 カスタムエラーレスポンスを設定しているとCloudFrontがレスポンスを返却する 直前に上書き するのでALBでブロックしてもその後に200に上書きしたレスポンスが返却されます。 余談ですが、動作確認の反省ポイントは HTTPメソッド: HEAD の API を使っていたことです。GETで確認していればレスポンスがindex.htmlに上書きされることの早期発見に繋がったはずです。 カスタムエラーレスポンスの意図 ではなぜこんな設定をしていたのか。 一言で言うと / 以外のリク エス トで index.html を返却するためです。 これはフロントエンドがSPAであることが関係しています。SPAは名の通り1つのhtml(今回はindex.html)を起点に動作します。画面遷移は JavaScript で実装されたrouterが担います。routerがURLを管理し、動的importが評価されるたびにブラウザがアセットのリク エス トを飛ばします。 SPAの場合はこの起点となるindex.htmlありきです。 当たり前ですがindex.htmlがユーザーに届けられなければ、アプリケーションのエントリーポイントが無いので後続のjsや css を読み込めません。 CloudFrontでSPAを ホスティング している場合、ルートページ以外への直接アクセスで index.html が存在しない状態が発生し得ます。 例えば https://mntsq.com/hoge というURLへrouterを介さずにブラウザのURLバーへ直接入力してリク エス ト、新規タブで画面遷移、またはリロード等です。 その場合は、 GET: https://mntsq.com/hoge リク エス トが CloudFront へ飛びます。 しかしS3は /hoge というオブジェクトが無いのでエラーを返します。非認証なら403、認証後なら404です。結果としてブラウザに index.html が届けられなくなってしまいます。 これを回避するためにカスタムエラーレスポンスを設定していました。 しかし、カスタムエラーレスポンスの対処ではマルチオリジン構成の場合に課題があります。 カスタムエラーレスポンスの課題 カスタムエラーレスポンスはS3以外のオリジンへのレスポンスも上書きしてしまうことです。 カスタムエラーレスポンスはCloudFrontの ディストリビューション に対して設定されます。 CloudFrontにおいて ディストリビューション とオリジンの以下の関係性です。 ディストリビューション S3 オリジン API オリジン S3だけでなく API のレスポンスが403や404の場合にも上書きされてしまいます。そうなるとクライアント側で API のエラーハンドリングもできません。 解決策 CloudFront Functionsで リク エス ト を上書きします。 CloudFront Functionsはカスタムエラーレスポンスと違い ディストリビューション 事ではなくビヘイビア毎に紐つけることができます。 つまり /* へのリク エス トだけを上書きし、 /api/* へのリク エス トに対しては何もしないという制御ができます。 処理の分割としては以下のようになります。 CloudFront FunctionsではCloudFrontのリク エス トやレスポンスに JavaScript で軽量な処理を挟むことができます。 シーケンス図内の⑨ 必要に応じてindex.htmlを要求の箇所は、拡張子の有無で識別できました。 そのため、拡張子の無いリク エス トはindex.htmlのリク エス トに上書きするというCloudFront functionを以下のように実装しました。 /** * SPAの初回ロードリクエストに対して/index.htmlを返す関数 * * SPAでは新規タブやリロードでルート以外のURLに初回アクセスするとindex.htmlが返らず画面がホワイトアウトするため、 * この関数は初回リクエストを/index.htmlに書き換える。 */ function handler ( event ) { const request = event . request const uri = request . uri const pathTail = uri . substring ( uri . lastIndexOf ( "/" ) + 1 ) // 最後の/以降のパス const hasExtension = pathTail . includes ( "." ) // パスに.が含まれているか(ファイル拡張子があるか) if ( ! hasExtension ) { // 拡張子無しは、新規タブやリロードによるルートページ以外の初回ロードリクエスト // SPAにおけるエントリーポイントである/index.html に書き換える request . uri = "/index.html" } return request } } CloudFront Functionsで注意すべき点は JavaScript のランタイム環境です。 JavaScript ランタイム1.0と2.0の2種類ありますがどちらも ECMAScript 5.1に準拠しています。 大規模で レイテンシー の影響を受けやすい CDN カスタマイズのための軽量な関数を記述できるとあります。( link ) この軽量さを実現するためにランタイム環境として使える機能も厳選されているようなので注意が必要です。( link ) 上記のCloufFront Functionsは JavaScript ランタイム2.0で動作しています。 まとめ カスタムエラーレスポンスからCloudFront Functionsに切り替えることでマルチオリジンのSPAを ホスティング し、 API のエラーも適切にハンドリングできるようになりました。 元々カスタムエラーレスポンスで403,404を200に上書きするという設定に違和感がありましたが、直感の通りでした。CloudFront Functionsは AWS コンソール上にテスト機能があり非常に助かりました。 MNTSQの仕事にご興味を持たれた方は、ぜひ 採用情報のページ をご覧ください。
アバター
こんにちは、 MNTSQ ( モンテスキュー ) で アルゴリズム エンジニア(AIエンジニア)をしている清水です。 MNTSQのプロダクトをLLMネイティブなプロダクトに進化させるべく、LLMOpsに関する実装が増えてきた今日この頃です。 これらの実装の過程で、 複数の MCP サーバーに接続してセッションを管理するにはどのような実装がベストか? という問題にぶつかりました。 自前でラッパークラスを実装するしか方法はないのか、と思っていたのですが、 MCP Python SDK に ClientSessionGroup というクラスがあることを発見したので、これを使うと良さそうだという結論になりました。 私が調べた限りでは、 ClientSessionGroup の使用方法について紹介している記事などは見つけられませんでした。そのため、本記事で MCP Python SDK の ClientSessionGroup の仕様や使い方、 ClientSession との違いなどを整理してまとめてみました。 前提 MCP の実装にはAnthropicの MCP Python SDK のバージョン1.19.0を使用しています。今後のバージョンアップによって仕様が変更する可能性がある点にご留意ください。 github.com 単一 MCP サーバーと接続するサンプルコード 単一の MCP サーバーと接続するミニマムなサンプルコードを以下に示します。このサンプルコードは公式 GitHub で紹介されている サンプルコード を少し改変したものです。このサンプルコードを複数の MCP サーバーと接続できるように拡張し、比較する形式で説明します。 import asyncio from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client async def main (): mcp_server_url = "http://127.0.0.1:8000/echo/mcp" async with streamablehttp_client(mcp_server_url) as (read_stream, write_stream, _): async with ClientSession(read_stream, write_stream) as session: await session.initialize() tools = ( await session.list_tools()).tools print (f "Available tools: {[tool.name for tool in tools]}" ) result = await session.call_tool( name= "echo" , arguments={ "message" : "Hello, world!" } ) print (f "Call tool result: {result}" ) if __name__ == "__main__" : asyncio.run(main()) streamablehttp_client を使用して MCP サーバーに接続し、 ClientSession を使用してセッションを作成しています。 MCP サーバーが一つだけならば、このサンプルコードをほぼコピペする形で実装できるのですが、 複数 MCP サーバーの場合はどのように実装すれば良いでしょうか? ClientSessionGroup を使って複数の MCP サーバーと接続する このような用途のために ClientSessionGroup が用意されています 。公式の APIリファレンス では以下のように紹介されています(筆者による日本語訳)。 複数の MCP サーバーへの接続を管理するためのクライアントクラス。 このクラスは、サーバー接続の管理機能を カプセル化 する役割を担う。接続されたすべてのサーバーから提供されるツール、リソース、およびプロンプトを集約する。 先ほどのサンプルコードを複数 MCP サーバーへと接続できるように拡張すると以下のようになります。 import asyncio from mcp import ClientSessionGroup from mcp.client.session_group import StreamableHttpParameters async def main (): mcp_server_urls = [ "http://127.0.0.1:8000/echo/mcp" , "http://127.0.0.1:8001/math/mcp" , ] # 1. `ClientSession`の代わりに`ClientSessionGroup`を使用して`session_group`を作成 async with ClientSessionGroup( component_name_hook= lambda name, server_info: f "{server_info.name}.{name}" ) as session_group: for url in mcp_server_urls: # 2. `ClientSessionGroup.connect_to_server`を使い、MCPサーバーと接続・セッションを作成 await session_group.connect_to_server(StreamableHttpParameters(url=url)) # 3. `ClientSessionGroup.tools`から全てのツールにアクセスする tools = session_group.tools.values() print (f "Available tools: {[tool.name for tool in tools]}" ) tool_names = session_group.tools.keys() print (f "Tool names: {tool_names}" ) # 4. `ClientSessionGroup.call_tool`で各種ツールを実行する result = await session_group.call_tool( name= "EchoServer.echo" , args={ "message" : "Hello, world!" } ) print (f "Call tool result: {result}" ) result = await session_group.call_tool( name= "MathServer.add_two" , args={ "n" : 10 } ) print (f "Call tool result: {result}" ) if __name__ == "__main__" : asyncio.run(main()) 単一 MCP の場合との差分を一つずつ見ていきます。 1. ClientSession の代わりに ClientSessionGroup を使用して session_group を作成 ClientSession を インスタンス 化するときとは異なり read_stream や write_stream などは不要です。空のセッショングループを作成し、後からセッションを追加していく方式であるためです。また、 MCP サーバー間でのツール名の衝突を避けるために、 component_name_hook を与えることができます。 2. ClientSessionGroup.connect_to_server を使い、 MCP サーバーと接続・セッションを作成 ClientSessionGroup.connect_to_server に MCP サーバーのURLを渡すだけで、内部で MCP サーバーに接続し、セッションが作成されます 。以下に connect_to_server メソッド内部の処理を一部抜粋します。 session_stack = contextlib.AsyncExitStack() try : # 中略 else : client = streamablehttp_client( url=server_params.url, headers=server_params.headers, timeout=server_params.timeout, sse_read_timeout=server_params.sse_read_timeout, terminate_on_close=server_params.terminate_on_close, ) read, write, _ = await session_stack.enter_async_context(client) session = await session_stack.enter_async_context(mcp.ClientSession(read, write)) result = await session.initialize() 実装を確認すると、単一 MCP の場合と同様に streamblehttp_client で MCP サーバーに接続 ClientSession でセッションを作成 ClientSession.initialize で初期化 を実行していることがわかります。このように 単一 MCP の場合と同じように作成したセッション をコンテキストスタックに追加することで、複数セッションを管理しています。 3. ClientSessionGroup.tools から全てのツールにアクセスする ClientSessionGroup.tools で全てのツールにアクセスできます。dictを返すので .values() をつけることで、 単一 MCP の場合の (await session.list_tools()).tools と同等のオブジェクト を得ることができます。ただし、以下のような相違点があります。 ClientSessionGroup は、 connect_to_server の実行時に、内部で ClientSession.tool_list を呼び出します。 ClientSessionGroup.tools はすでに取得済みのツール群を返すpropertyであり、その場で ClientSession.tool_list を呼び出しているわけではありません。 ClientSessionGroup.tool は {”ツール名”: Toolオブジェクト} のdictを返します。この”ツール名”は component_name_hook 関数によって付けられた名前です。一方で、 Tool オブジェクトの name 属性は元のツール名のまま であることに注意してください。後に call_tool を実行するときは component_name_hook 関数によって付けられた名前で呼び出す必要があります。よって LLMに Tool オブジェクトを与えるときも、 component_name_hook 関数によって付けられた名前に差し替えた Tool オブジェクトを与える 必要があります 1 。 また、promptやresourceについても上記の仕様が当てはまります。 4. ClientSessionGroup.call_tool で各種ツールを実行する ClientSessionGroup.call_tool は ClientSession.call_tool とほぼ同等に使うことができます 。以下に ClientSessionGroup.call_tool のコードを抜粋します。 async def call_tool (self, name: str , args: dict [ str , Any]) -> types.CallToolResult: """Executes a tool given its name and arguments.""" session = self._tool_to_session[name] session_tool_name = self.tools[name].name return await session.call_tool(session_tool_name, args) 見ての通り内部の実装は簡素なものです。与えられたツール名がどのセッションに属するツールかを解決する処理が挟まっています。この点を除けば、単に ClientSession.call_tool を呼び出す関数と言って良いでしょう。 まとめ 本記事では MCP Python SDK の ClientSessionGroup について紹介しました。ぜひ開発の参考にしていただけたら幸いです。 MNTSQでは、プロダクトをLLMネイティブに進化させるべく、LLMエージェントを搭載した新機能や、LLMの運用・改善のための基盤(LLMOps)を鋭意開発・構築中です。もしMNTSQの仕事にご興味を持っていただけたら、 ぜひお気軽にカジュアル面談でお話ししましょう! careers.mntsq.co.jp note.mntsq.co.jp tech.mntsq.co.jp この記事を書いた人 清水健 吾 MNTSQ アルゴリズム エンジニア LLMのご機嫌と格闘する日々です。 私はこの仕様に気付かず時間を溶かしてしまいました。 MCP のSpecificationでも複数サーバー間でのツール名の衝突に関する仕様は定まっておらず、現状では component_name_hook を与えずに実装するのが無難かもしれません。 ↩
アバター
はじめに 構成 ログ送出 ログ保管 GuardDuty 関係 分析 結果確認 実際の運用 分析系 行動系 おわりに はじめに MNTSQ はそのサービスの性質(「契約」の集約、一元管理、活用)上、セキュリティの維持と向上が至上命題です。よってセキュリティ改善において強いモチベーションが存在します。 今回の取り組み以前にも AWS ベストプ ラク ティスに沿った AWS アカウントの管理や各種ログの収集は行われていましたが、収集済みログの活用やセキュリティ系の AWS 各サービス運用には改善の余地が多々ありました。今回ここにテコ入れし、現状に寄り添った運用ができるように改善することを目論みました。 ここでいう「現状に寄り添った」運用とは以下のようなことを指します。 少ない人員でも無理のない範囲で状況把握ができること 管理の手間がなるべく発生しないようなシンプルな構成であること 機微情報に対するアクセス状況を追跡したいなど、上記を鑑みてもなお守りたいセキュリティ要件を達成できるような仕組みが整備できること 構成 おおむね以下のような構成をとっています。アイコンがいっぱい並んでいて「管理の手間がなるべく発生しない」構成なのかには議論の余地がありそうですが、 AWS マネージドサービスを多用する構成につき、方針自体は問題ないと思います。 図中の AWS アカウントの区分は以下のようになります。 member:実際にアプリケーションが動きワークロードを捌く環境(= AWS アカウント) security:セキュリティ関係の諸々を集約している AWS アカウント 各種ログを分析する Athena 関連リソースはここに置く AWS Organizations で管理可能なセキュリティ系 AWS サービスの delegated admin はこのアカウントに設定する master: AWS Organizations 管理アカウント セキュリティ文脈では然程関係ないが上述 "security" な AWS アカウントとの関連で言及 構成図内の要素を大別すると以下のような部位に分けられるでしょう。 ログ送出 ログ保管 GuardDuty 関係 *1 分析 結果確認 以後それぞれについて解説します。 ログ送出 ログ送出関連要素をハイライトした図 分析の項で別途詳述しますが、基本的には S3 バケット にログを置き、それを Athena によって検索 / 分析するような手法をとっています。つまり収集対象としたいログは S3 に置く必要があります。 標準で保存先を S3 に指定可能なサービスであれば話は簡単ですが、一部そうでないものもあります。上記構成図でいえば Route 53 公開 DNS クエリログが該当しました。こちらについての取り組みは拙稿の以下を参照ください。 tech.mntsq.co.jp ログ保管 ログ保管関連要素をハイライトした図(大変見辛いのですが S3 のあたりを強調しています) S3 バケット にログを置くようにさえ出来れば IAM ポリシ / S3 バケット ポリシで定義される権限調整を頑張る前提で Athena から扱えるようになります。Athena と S3 バケット とは同一の AWS アカウントにあってもよく、また別個であっても構いません。とはいえ管理の手間や認知負荷などを考えればどちらか一方(= 同一アカウントに置くか別個のアカウントに置くか)に運用方針を寄せるのがベターでしょう。 弊社では現状を鑑みて無理に運用方針を統一させるのは止し、以下のように2つを並立させるようにしました。 既に長年にわたりログが蓄積されており、過去ログへアクセスできることが運用上でメリットになるもの Athena / S3 とで別アカウントに置くことを許容 AWS WAF v2 ブロックログや ALB アクセスログ などが該当 新規にログ取得を開始したもの Athena / S3 それぞれ同一 AWS アカウントで取扱 前述のとおりセキュリティ関係の諸々は単一 AWS アカウント (security) で管理しており、この文脈において Athena はセキュリティ用 AWS アカウントで管理されます。 Athena と同一の AWS アカウントに置かれる S3 バケット は security に、別個の AWS アカウントに置かれる場合は member に、それぞれ存在することになります。 GuardDuty 関係 GuardDuty 関係の関連要素をハイライトした図 ここは至極単純で、GuardDuty をほぼ吊るしで使うのみです。本番環境用の AWS アカウントでは S3 向けの malware protection *2 も有効にしています。前述のとおりセキュリティ用 AWS アカウントを delegated admin *3 に設定し、他の AWS アカウントをすべて member 扱いにします。member / delegated admin 問わず、すべての GuardDuty 検出内容 (finding) はセキュリティ用 AWS アカウントの GuardDuty で管理します。 分析 分析関連の要素をハイライトした図 各ログ別にデータベースを、環境別にテーブルを、それぞれ Athena 上に選択します。データベースにあわせて workgroup も分けるようにしました。これは以下効果を狙ってのものになります。 Athena ログ検索結果を活用するにあたり、結果の格納先をログの種類別に分けたかった。これを手間なくやる *4 には workgroup 単位で保存先を指定する必要があった workgroup を定義することで Athena クエリ実行状況を EventBridge によって追跡することができ、後処理をイベント駆動的に実施できるようになる利点があった *5 Athena によるログ分析は週次での定期実行とし、ログ分析用 Athena クエリを名前付きクエリとして整備したうえで Lambda から当該クエリを呼び出すようにしています。 名前付きクエリではなく生のクエリを使うことで EventBridge スケジューラ *6 を使えるメリットがあるのですが、今回は EventBridge イベント(cron 記法によるスケジュール実行)+ Lambda 関数による実行の仕組みを整備しています。 これは定期実行したクエリ内容を適宜改変して詳細なログ調査を行いたい状況も多々あり、この改変に用いるクエリの元ネタを割合簡単に扱えるよう整備するには、名前付きクエリとして管理するのがベターと思われた為です。 結果確認 結果確認関連要素をハイライトした図 ログの内容や分析結果、およびそれらからどういった情報を得たいかによって、Athena クエリ結果の取り扱い方が変わります。弊社ではおおむね以下のような分類ができました。 ログ検索結果としてごく少量の情報が得られ、そこから直ちに ネクス トアクションが決められるもの ログ検索結果から得られる情報が量的にそれなりの規模になってしまうものの、結果を踏まえて ネクス トアクションが決められるもの ログ検索結果から得られる情報が量的にそれなりの規模になってしまい、かつ ネクス トアクションが決めづらいもの 各結果を実際にどう取り扱っているかについては後述しますが、上記3分類において、以下のような取り扱い方を整備することにしました。項番は前項3点にそれぞれ対応します。 検索結果とあわせて対応すべき内容を示した手順とを含めて Slack に流し、通知された結果をみて適宜対処 1. と同様 結果確認用のアプリケーションを整備し、傾向の変化を観察する。異常が見られた場合は都度対処 要するに何をすればよいかが明確なものは通知して対処し、何をすべきか悩むものについては傾向をみるためのお膳立てを整える、といった指針になります。 なお 3. で示した結果確認用のアプリケーションは S3 にコンテンツを置き CloudFront で配信し Cognito で認証認可を行う SPA を用意し、これに Athena クエリ結果を浚わせて可視化するといった体制を組んでいます。このあたりの裏側については拙稿以下を参照ください *7 。 tech.mntsq.co.jp 実際の運用 ひとまずここまでで各種ログを取り扱うまでの仕組みについては解説できました。ここからは得られたものをどう運用しているかについて解説します。主に2つの区分の運用があります。 分析系 Route 53 公開 DNS クエリログ調査 Route 53 リ ゾル バクエリログ調査 VPC フローログ調査 WAF ブロックログ調査 行動系 機微情報を収容する S3 バケット 操作状況調査 Redash 操作状況調査 GuardDuty 検知内容調査 分析系 Route 53 クエリログ / VPC フローログ / WAF ブロックログを所定の方針で Athena によって週次で調査し、その結果を観察したうえで適当なアクションをとります。これら分析系の運用は前掲の ログ検索結果としてごく少量の情報が得られ、そこから直ちに ネクス トアクションが決められるもの ログ検索結果から得られる情報が量的にそれなりの規模になってしまうものの、結果を踏まえて ネクス トアクションが決められるもの ログ検索結果から得られる情報が量的にそれなりの規模になってしまい、かつ ネクス トアクションが決めづらいもの という3分類においては 2. と 3. に該当します。2025年10月現在、各ログはそれぞれ以下のような指針 / 手段でもって分析結果を取り扱うようにしています。 種類 方針 取り扱い方の分類 (前項 1. , 2. , 3. ) 運用方法 Route 53 公開 DNS クエリログ調査 NXDOMAIN ログの傾向観察 *8 3. 集計結果を自前の SPA で捌き観察 Route 53 リ ゾル バクエリログ調査 継続して通信実績のある ドメイン 宛以外に VPC 内からインターネットへ出るリク エス トが無いか確認 2. 未知 ドメイン があれば詳細を確認し、既知 ドメイン はアクセス許可リスト *9 に追加 VPC フローログ調査 VPC 内からインターネットへ出る際のアクセス先に不審なものがないかを AS のレベルで確認 3. 集計結果を自前の SPA で捌き観察 WAF ブロックログ調査 ブロック状況に変動が無いかを傾向観察 *10 3. 集計結果を自前の SPA で捌き観察 Route 53 クエリログについては 拙稿 も参照ください。本稿二度目の参照につきリンクテキストにて失礼します。 また SPA をつかった調査をしている事例は実際の風景をお見せできればよかったのですが、マスクすべき箇所があまりに多く、有益なものを提示できなさそうだったので、泣く泣く省略します。 行動系 ログ調査結果から直ちにアクションがとれるもの(つまり前掲の分類でいう 1. )がここに該当します。なお GuardDuty についてはログ云々の取り扱いではなく、GuardDuty が finding を上げたタイミングがアクションをとるトリガとなります。以下のような指針 / 手段になります。 種類 方針 運用方法 機微情報を収容する S3 バケット 操作状況調査 特定の S3 バケット を対象に CloudTrail にて data event *11 を収集し、この結果として操作が認められた場合に背景などを確認 週次で前週分の操作ログを棚卸しし、操作対象者を特定の上で背景を非同期で確認 *12 Redash 操作状況調査 Redash ログイン時ログおよび Redash クエリ実行ログを収集の上、勤務時間帯ではないタイミングでの操作が無いかを確認 週次で前週分の操作ログを棚卸しし、操作対象者を特定の上で背景を非同期で確認 GuardDuty 検知内容調査 GuardDuty 検知事項を素直に調査する GuardDuty がなにがしかを検知したタイミングで調査タスクとして GitHub issue が作成され、issue ベースで内容を調査 *13 Redash ログに関しては以下をもとにした内容を使い収集 / 運用しています。 tech.mntsq.co.jp おわりに 弊社における AWS 各種ログの集約と活用、およびそれらを踏まえたセキュリティ系改善の取り組みについて解説しました。半年程度でここまでやったのだなと思う向きもありますし、まだまだやれた乃至やれることもあるよなとも感じています。 もちろんここまでに解説した内容については様々な改善余地があり、また現時点でも着手出来ていない施策が数多くあります。たとえば傾向観察が主体となっている取り組みについては掴んだ傾向を踏まえて具体的なアクションをとってゆく必要があり、GuardDuty / CloudTrail 以外にもこの種の用途では有効な AWS のセキュリティ関連サービスがあるはずです。よりよい組み合わせを模索し、今後もセキュリティ方面の最適化に取り組んでゆきたいと考えています。 記事中で扱った取り組みにおいて、特に何らかの制限を課す方向の取り組みはしていない向きに気付かれた方がいらっしゃるかもしれません。これは制約を課すセキュリティを目的とするものではなく、発生している事実を正確かつ適切なタイミングで把握するという方面に重きを置いている面があるためです。予防的統制のための取り組みであると言ってもよいかなと思います。 AWS サービスの各種ログの活用やセキュリティ施策の取っ掛りとして、本稿が何らかのお役に立てば幸いです。 MNTSQ 株式会社 SRE 秋本 *1 : GuardDuty はじめセキュリティ系の AWS サービスは admin / member 間の関係による "finding" の蓄積という考え方をし、ログとは異なるため、別物として扱っています *2 : https://docs.aws.amazon.com/guardduty/latest/ug/gdu-malware-protection-s3.html *3 : https://docs.aws.amazon.com/guardduty/latest/ug/delegated-admin-designate.html に従い設定します *4 : クエリ実行時に結果保存先を指定する手法は https://docs.aws.amazon.com/athena/latest/ug/query-results-specify-location.html によれば AWS マジネジメントコンール上では使えますがそれ以外では使えません。後述のとおり Athena クエリの実行は定期処理とするため、この手法は使えませんでした *5 : https://docs.aws.amazon.com/athena/latest/ug/athena-events.html が詳しいです *6 : https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html *7 : 余談ですが当該記事において「ちょっとした要件」とした内容は本件を指すものでした。伏線(?)回収でした *8 : 存在しない ドメイン へのアクセス試行状況から attack surface を探る傾向に変動がないかを確認する狙いがあります *9 : いままでは Athena 上に Route 53 リ ゾル バクエリログとの突き合わせを目的とした DB を整備し、その中身である CSV ファイルをアクセス許可リストとして運用していましたが、つい最近 Route 53 リ ゾル バ DNS ファイアウォール を使う運用に改めました。 https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver-dns-firewall-overview.html が詳しいです。これもいずれ記事にできればと思います *10 : ブロック量が急激に増えてきた場合は攻撃試行が増えてきたという見方ができるので、そのあたりを追跡する狙いがあります *11 : https://docs.aws.amazon.com/awscloudtrail/latest/userguide/logging-data-events-with-cloudtrail.html *12 : Slack 等テキストベースで「これこれこういう操作をされたようで、どんな感じです?」と伺うような感じです。 性善説 をとっています *13 : 前述の構成図からこのあたりの要素が漏れていました。GuardDuty イベントを EventBridge でフックし、Lambda 関数を呼び出し、 GitHub の API を呼び出し issue を起票するようにしています。GuardDuty 検知事項対処用の GitHub Project を整備したうえでタスク管理をおこなっています
アバター
はじめに スキーマ分離と行分離 目的と結論 目的 結論のサマリ 試験内容 試験環境とツール 負荷の設計 本番環境でのクエリ傾向の分析 QPSの測定 進め方 試験結果 スキーマ分離のボトルネック スキーマ数を固定して負荷をあげてみる 結果まとめ なんとか延命したい はじめに 弊社が採用しているDB設計は、テナントごとに独立した スキーマ を持つ 「 スキーマ 分離」 のデータ構造に基づいています。この アーキテクチャ は、高いデータ分離性とセキュリティを確保できる一方で、 「 スキーマ 数の増加に伴ってパフォーマンスが劣化する」 という性質が指摘されます。 サービスのスケールにおいてこの「性能劣化」が、いつ、どのように顕在化するのかは、設計上の大きな課題でした。この漠然としたリスクを 定量 的に評価し、将来的な「行分離」 アーキテクチャ への移行の是非を判断することを目的に、 負荷試験 を実施しました。 本記事では、この試験で明らかになった、 スキーマ 分離 アーキテクチャ の抱える本質的な ボトルネック を共有させていただきます。 ※ MySQL では"SCHEMA"よりも"DATABASE"という呼び方が一般的ですが、本記事では宗教上の理由により" スキーマ "と表記させていただきます スキーマ 分離と行分離 マルチテナントのデータベース設計において、代表的なデータ分離方式は「 スキーマ 分離」と「行分離」の2つです。弊社が現在採用しているのは「 スキーマ 分離」方式です。 それぞれの分離方式の特性を比較します。 スキーマ 分離・行分離の特性比較 弊社サービスは、元々シングルテナントで運用していた過去があり、 スキーマ 分離を選択しました。しかし、マルチテナントのリアーキテクトが完了し、事業がスケールするフェーズに入ったことにより、 スキーマ 分離の抱えるリスクを無視できなくなってしまいました。 目的と結論 目的 記事冒頭に記載した通り、現在の スキーマ 分離データ構造が、サービスのスケールに伴い、どの段階で性能上の限界を迎えるのかを明確にし、 アーキテクチャ 移行の要否と期限を定めることが目的です。 結論のサマリ 性能劣化の要因は、 クエリの絶対量よりも スキーマ 数の増加にある 弊社ケースでは600テナント数を超えると、 データディクショナリ への メタデータ アクセスへの待機時間が顕著 になり、これが性能劣化の支配的な要因となる 試験内容 試験環境とツール 試験環境の構成は基本的にRDS+EC2のみです。 試験対象のRDS (Aurora MySQL ) 8.0. mysql _aurora.3.08.2 db.r6g.2xlarge 本番環境の平均的なテナントを模したデータ量(169テーブル、7.8GiBくらい)の スキーマ を複製する 負荷をかけるEC2 インスタンス OSは不問 (今回は Ubuntu を使用) スペックはかけたい負荷の関係でc6gn.2xlargeから最終的にc6gn.8xlargeまで上げた また、 負荷試験 には qube という オープンソース のツールを使用しました。このツールは一つの スキーマ にしか負荷をかけられないため、これを複数 スキーマ に対して並列実行し、出力されたレポートを集計する bash スクリプト を、以前記事にしたDevinに作成してもらいました。 集計レポートはこんな感じ { " StartedAt ": " 2025-09-25T07:30:32.531620003Z ", " FinishedAt ": " 2025-09-25T08:42:16.475152837Z ", " ElapsedTime ": 1800 , " ParallelCount ": 24 , " TargetSchemaCount ": 150 , " QueryCount ": 44652323 , " ErrorQueryCount ": 0 , " AvgQPS ": 847.79 , " TotalQPS ": 24806.84 , " Duration ": { " P50 ": " 304.22µs ", " P75 ": " 364.25µs ", " P95 ": " 639.14µs ", " P99 ": " 886.44µs ", " P999 ": " 1345.26µs ", " Avg ": " 343.97µs ", " Max ": " 37678.64000µs ", " Min ": " 141.706µs ", " TotalSamples ": 44652323 } } tech.mntsq.co.jp 負荷の設計 本番環境でのクエリ傾向の分析 MySQL では performance_schema.events_statements_summary_by_digest というテーブルで、クエリを正規化して集計した情報を見ることができます。 (例えば WHERE id = 120 を WHERE id = * として、値部分が異なるクエリも同じクエリとみなします)弊社サービスの場合は、本番環境のクエリの80%以上が、上位11種類のクエリで占められていました。この11種類のクエリ比率を崩さないように、具体的な値を入れて SQL 文を10000個程作成し、qubeで実行可能なjsonlファイルを作成しました。(この作業もDevinで行いました。生成AIの進化に感謝です) なお、 SQL をどの程度用意すべきかはケースバイケースです。今回のケースでは1 スキーマ に数百QPSの負荷を5秒程度かけて次の スキーマ に負荷をかけるということを行うので、10000種類程度用意すれば同じ SQL が何度も流れることはないだろうと判断しました。MySQL8以降はクエリキャッシュの概念がないので、ここまでシビアになる必要はなかったかもしれませんが、キャッシュが効く環境だと、この点も考慮しないと意味のない 負荷試験 になってしまうので注意しましょう。 QPSの測定 クエリ傾向とは別に、本番環境での QPS (Query Per Second) を測定しました。 SHOW GLOBAL STATUS LIKE 'Questions' という SQL で、 MySQL が起動してから受け付けたクエリの総数がわかります。サービスがよく利用される時間帯でこの値の増分を記録し、1秒あたりに直すことでQPSを見積もることができます。時期・曜日など、サービスの利用のされ方が異なる複数の日の結果を平均するのが好ましいでしょう。 弊社サービスでは約3200QPS程度でした。 進め方 スキーマ 数に比例して負荷を増やしていき、目標QPSを達成できない点を見極めます。 スキーマ 数: 150 → 300 → 600 → 1200と増やしていき、限界点が見えたらその間の設定も追加で検証する QPS: 3,200 → 6,400 → 12,800 → 25,600と増やしていく( スキーマ 数=テナント数にQPSが比例するという厳しめの条件) 試験ツールは、指定の数だけスレッドを立ち上げ、各スレッドは指定したローテーションの時間だけ スキーマ に負荷をかけ、終わったら次の スキーマ に負荷をかけに行きます。ローテーションごとにレポートを保存し、全実行が終わり次第全てのレポートを集計した統合レポートを出力します。今回の試験では、 スキーマ ごとに5秒程度の負荷をかけてローテーションし、全体で30分ほど測定を行いました。 # 4並列の場合 Thread1: schema_1 → schema_5 → …. Thread2: schema_2 → schema_6 → …. Thread3: schema_3 → schema_7 → …. Thread4: schema_4 → schema_8 → …. 試験結果 スキーマ 分離の ボトルネック 弊社のケースだと、600 スキーマ を超えたあたりで急激な性能劣化が見られました。縦軸が対数目盛りである点に注意してください(遅延は1目盛り増えると10倍になる) スキーマ 数の増加 対 クエリ遅延のグラフ 800 スキーマ 以上では目標QPSまで負荷を上げることができなかったので、600 スキーマ 時点の負荷設定(クライアント側で24並列, 合計12000QPSをかける設定)で固定してデータを取りました。クライアント側の設定を固定しても、達成できるQPSは800テナント以降では顕著に低下していきました。 また、1200 スキーマ の負荷をかけていた時間帯のパフォーマンス インサイト を見て、遅延の原因を考察してみます。簡単に見方を説明すると、灰色の破線がCPUのキャパシティで、これを超えているとよろしくない状態であると言えます。問題なく稼働しているDBでは、破線よりも下のラインに収まっているはずです。 1200 スキーマ 負荷試験 時の待機時間の内訳 このグラフを見ると、dict_sys_mutexやparser_mutexなどの待機時間が支配的になっていることがわかります。これは、MySQL8.0以降では、 データディクショナリ テーブルが mysql . ibd という単一の InnoDB テーブルスペースに保存される仕様に起因していることが考えられます。 dict_sys_mutexは、 データディクショナリ へのアクセスが競合状態となった際に発生する待機イベントで、例えばオープンテーブルキャッシュに載ってないテーブルにアクセスする場合などに データディクショナリ へのアクセスが必要になります。テーブル数の増加に伴い、この際に競合が発生しやすくなります。 parser_mutexは、クエリのパースの際にテーブル・カラムなどの情報を データディクショナリ から取得する際の競合イベントで、やはりこちらもテーブル数の増加に伴い顕著になります。 スキーマ 数を固定して負荷をあげてみる データディクショナリ への メタデータ アクセスが性能劣化の原因になるならば、 スキーマ 数(= メタデータ のサイズ)を固定すれば、より大きな負荷を捌けるはずです。 スキーマ 数を600に固定し、当初予定していた1200 スキーマ 相当の負荷(24000QPS)を捌けるかの測定も行ってみました。 結果は以下のとおりです。今度は縦軸は線形目盛りです。 負荷(QPS) 対 クエリ遅延のグラフ 24000QPSの 負荷試験 実施時の待機時間の内訳 当初予定していた1200テナント想定の負荷である24000QPSも余裕で達成できました。また データディクショナリ へのアクセス待機時間は目立たなくなり、CPU待機時間が支配的な、健全なものであることが確認できます。 スキーマ 数(= メタデータ のサイズ)を固定すれば、より大きな負荷を捌けるはずという仮説は正しそうです。 結果まとめ 以上のことから、弊社のケースだと600テナント程度の収容が限界点であることが確認でき、 負荷試験 の目的を達成できました。また、MySQL8.0以降の スキーマ 分離のデータ構造は、 スキーマ 数(テナント)の増加が性能の ボトルネック になるという定性的な事実を数値的に理解することができました。 例えば、 スキーマ 数が数百程度までしか増加しない、 スキーマ あたりのテーブル数が多くない、などの場合は、「 スキーマ 分離・行分離の特性比較」の表で示したメリットを享受するために、 スキーマ 分離のデータ構造を選択するのもありなのかもしれません。しかし、 スキーマ 数が大きくスケールすることが予想されるサービスでは、行分離のデータ構造を採用することが無難と言えるでしょう。 なんとか延命したい とはいえ、 スキーマ 分離で作ってしまったものを行分離に作り直すのはかなり骨が折れる作業になります。弊社でも長期的にはリアーキテクトが必要という認識にはなりましたが、中期的な事業計画・ 工数 の観点から、なんとか延命措置を図れないかという議論が生まれました。 そこで、 今回の試験結果を AWS のソリューションアーキテクト(SA)の方に共有をしたところ、以下のようなアド バイス を頂けました! table_open_cache, table_definition_cache の調整: オープンテーブルキャッシュのサイズを増やすことで、 データディクショナリ の参照頻度および競合発生頻度を抑えられるかもしれない innodb _sync_array_size の調整: 待機中のスレッドの数が多いワークロードの同時実行性が高まるので、競合の待機時間が短くなるかもしれない インスタンス タイプやストレージタイプの変更: r6g → r7g,r8gにすること、ストレージ設定をAurora I/O-Optimized に変更することなどで、パフォーマンスの向上が見込める 根本的な解決にはなりませんが、これらのチューニングを行うことで、現在よりも アーキテクチャ 移行のデッドラインが後ろにずれる可能性があります。これらの調整を行ってみて、どのような結果が得られたかについては、また次回報告させていただきます! MNTSQ株式会社 SRE 西室 ↓次回 tech.mntsq.co.jp
アバター
こんにちは。 「すべての合意をフェアにする」MNTSQの森山です。 この度、MNTSQでリードエンジニアを務めることになりました。 リードエンジニアは、チームの出力を最大化するためにあらゆる角度からデリバリーを支えます。責任を持つのは「コードの品質」だけではなく、「チームとして成果を出すこと」です。そのために必要な技術的・組織的な取り組みをリードしていきます。 エンジニアの役割は、会社やチームによって定義が少しずつ異なります。そこで今回は、 MNTSQにおけるリードエンジニアとは何か を、自分自身の整理も兼ねて 言語化 してみました。 なぜリードエンジニアという役割が生まれたのか? これまで弊社には「リードエンジニア」という役割は存在しませんでした。 近年、AIの進化によって開発の生産性を高める新しい手段が次々と生まれています。やりたいこと、試したいことが増える一方で、単にエンジニアの人数を増やすだけでは期待通りに成果が伸びないという課題も見えてきました。エンジニアの人数が正しくデリバリーに寄与するためにはデリバリーにフォーカスする役割が必要だという考えからリードエンジニアという役割が新たに設けられました。 いままでは、テッ クリード が技術的な意思決定とともにデリバリーの責任も担い、負担が大きい状態にありました。今後は、テッ クリード は アーキテクチャ レベルで技術的な責務を担い、リードエンジニアがデリバリーの責務を担います。そうすることでテッ クリード はより技術的な意思決定にフォーカスできるようになります。 リードエンジニアは担当領域のデリバリー及びテッ クリード と相談の上でそれに伴う技術的な意思決定の責務を負います。相談は担当領域の難易度やリードエンジニア自身のスキルによって範囲が変わります。テッ クリード はリードエンジニアの責任範囲を適宜広げていくことも役割の一つであり、リードエンジニアはその範囲を広げることを目標とします。こうしてリードエンジニアを軸とした組織拡大が、より大きな成果につながることを目指しています。 またリードエンジニアは若手ソフトウェアエンジニアが次のステップへ進むためのキャリアの登竜門としても位置づけられています。内部登用も積極的に推進し、エンジニアが成長し続けられる環境を整えるという狙いもあります。 役割の具体像 例えば以下のような役割を担います。 アーキテクチャ ・技術の選定や エス カレーション 内部品質の担保 開発フローの整備 リソースの割り当て 技術タスクのコスト・納期の 言語化 ・共有 …etc 技術的な意思決定については、チーム内で完結できるものはメンバーを巻き込みながら主体的に結論を導きます。一方で、システム全体への影響が大きい判断についてはテッ クリード へ エス カレーションし、意思決定に必要な情報を提供します。 また、継続的な開発速度を維持するためには、内部品質への意識も欠かせません。将来的な ボトルネック を防ぐことも必要です。例えば「この負債を解消することでA機能の開発コストを○人日削減できる」といった形で、技術的負債の解消に対する 費用対効果を明確にしてPdMやデザイナーと共有する ことも積極的に提案します。 さらに、チーム全体の生産性を長期的な目線で高めるために、誰がどのタスクを担当するとスムーズに開発が進むか等も見極め、チームのリソースを配置します。 姿勢と マインドセット リードエンジニアとして大切にしたい姿勢や マインドセット についても整理しました。考えていたことを的確に 言語化 されていた こちらの記事 を一部、参考にしています。 ❌️ 最も優秀なプレイヤーであるべき ⭕️ 全体を俯瞰し、サポートに徹する リードエンジニアは、誰よりも優秀で、誰よりも多くのチケットを消化できることが必須ではありません。チーム作業の停滞を招く ボトルネック を解消し、将来的なリスクを先回りしてケアすることで、チーム全体の出力を最大化することに責任を持ちます。 ❌️ 難しいタスクを自分が担当するべき ⭕️ 難しいタスクもメンバーに任せる 技術調査や重めの機能実装、不確定要素の多いタスクこそメンバーを信頼して任せます。 リードエンジニア自身は、軽微な修正や方針が見えているバグチケットを消化しながら、リソースに余白を残しておきます。その余白を活かして突発的な ボトルネック の解消やリスクケアに対応し、チーム全体の出力向上に貢献します。 (もちろん全任せではなく、必要に応じて自らも難しいタスクを担います。) ❌️ コーディングは最小限にして管理に専念する ⭕️ コーディング・レビューも欠かさずやる 技術的な ボトルネック やプロセス上の問題を把握するためには、技術スキルとコード理解が不可欠です。またリードエンジニアは組織的にもフラットな立場であるため、 権威ではなく影響力をもってリーダーシップを発揮 することが求められます。 ❌️ チーム内の技術的な決定を一手に引き受ける ⭕️ チームで最善の決定ができるように情報整理、提案する 自らが結論を下して共有するのではなく、メンバーを巻き込みながら意思決定のプロセスを支えます。その過程と結論の双方にチームがオーナーシップを持てるよう導くことが、リードエンジニアの重要な役割です。 まとめ リードエンジニアは、時に自ら手を動かし、時に協力を仰ぎながら、 デリバリーを維持・向上させるために課題を見つけ、解決する存在 です。 今後、この役割を担う人数こそが、会社全体のデリバリー速度を左右する重要な役割になると考えています。 ただし、エンジニアだけでの開発を高速化することは容易ではありません。MNTSQでは、デザイナーやPdMなど様々な役割のメンバーと協力しながら、長期的なデリバリーの維持・向上を実現しています。 MNTSQの仕事にご興味を持たれた方は、ぜひ 採用情報のページ をご覧ください。
アバター
バックエンドエンジニアの河久保です 2日間にわたる Kaigi on Rails 2025 お疲れ様でした 今回の会場が東京駅 丸の内南口 から徒歩1分で着く会場だったので、中央線(一番丸の内寄りにホームがある)ユーザーの私としてはものすごくアプローチが良くて最高でした 次回は渋谷(神泉寄り)とのことで、 井の頭線 使うかなぁーとか考えながら帰途に就いてました 今回の聴講スタイル 今回の Kaigi on Rails では聴講したすべてのセッションを スマートフォン で録音し、終わり次第 Notebook LM に音声データを渡すということを実践してみました 日英のセッション問わず文字起こし精度も良く、音声まとめ、レポート作成といったもの良い体験が得られました 以下の スクリーンショット は『 Sidekiq その前に:Webアプリケーションにおける非同期ジョブ設計原則 / morihirok | Kaigi on Rails 2025 』の音声データをソースとしてテキストレポートを作成したものになります 「Sidekiq その前に:Webアプリケーションにおける非同期ジョブ設計原則」の音声データを流し込んだもの セッション中も PC のタイピングなどに気が削がれることがなかったので、今後のカンファレンスもこのスタイルで臨もうと思えるものでした 聴講したセッションの紹介 さて、ここからは本編の感想になります 聴講したセッションからいくつか所感交えて取り上げます kaigionrails.org 他のセッションでも言及されていたのですが、今年の Kaigi on Rails は「非同期ジョブ」に関するセッションが多かったです こちらのセッションは非同期ジョブのバックエンドを Delayed から Rails 8 から導入された Solid Queue に移行するというものでした ここで紹介されていた障害事例( トランザクション 内で perform_later)は弊社でも踏んでいたので、「わかる〜〜」と頷きながら聞いていました 弊プロダクトのコードを見てみると以下のような module でケアされていました module TransactionAwareClient extend ActiveSupport :: Concern include AfterCommitEverywhere included do around_enqueue do |_, block| after_commit do block.call end end end end 今は Rails 7.2 移行に update されているので enqueue_after_transaction_commit オプションに切り替え済みです フレームワーク の進化に感謝です kaigionrails.org Rails 8.0 で本体から提供されるようになった認証プロセスの generator で生成されるコードを使って、 Rails がどんな処理を行っているかをステップを追って解説する内容でした ブルートフォース アタックへのケアとして、 Rails 7.2 でサポートされた controller の rate limit を利用したり、わざとハッシュ生成を遅らせる機構が入れていたりするということでした またタイミング攻撃への考慮もされているよという紹介もありました セッションは User と 1 : 多関係の Session というモデルで DB 管理で 理由は セッションハイジャック 時にログインセッションを削除する際に redis データを全件調べる必要があり遅くなるからとのこと セッション情報は ActiveSupport ::CurrentAttributes を継承した Current というモデルでスレッドローカルな グローバル変数 として格納するという実装になっている パスワードリセットの挙動についても丁寧に説明いただきました これだけみても devise にお任せしたくなる気持ちはとてもわかりますが、 Web アプリケーションを開発する身としては知っておくべき内容なので、普段触っている フレームワーク のコードでレクチャーいただけるとても良い内容でした kaigionrails.org セッション終了して即座に社の Slack に投稿してしまうくらい印象に残ったセッションでした セッション終了後にさっそく社内投稿したところ module による table prefix を連動させた分離は packwerk を用いない形でコードとDB スキーマ への責務を見た目で分かるようになるので、導入してみたい気持ちが沸々と湧いてきました 運用面の事例紹介も大変学びが多かったです バッチ処理 のリスト化 目的や起動タイミング・頻度にとどまらず、何か異常ステータスになった際の リカバリ 緊急度、リトライ可否といった内容が記載することを MUST として、属人化を防ぐ取り組みをされていました エラート ラッキング 弊社も Datadog によるエラート ラッキング をやっていますが、エラーの軽重含めて通知量が多かったり、あまり理解できていない ドメイン の通知が飛んで来たりして取りこぼしがあったりします これに対してはオーナーシップを持たせた旗振り役のがんばりで通知チャンネルの正常化させたというエピソードが紹介されていました Runbook 障害対応やデータメンテといった本番環境の作業手順書の作成だけにとどまらず、 Runbook の利用回数、作業時間なども記録していました これにより複数回実施されていたり、対応時間のかかるものに対して運用の見直しや恒久対応に向けた 定量 指標に基づいた トリアージ が行われるということでした さいごに 9/30 に社内でのプチ参加報告会の機会をいただき、発表スライドを使って私の方から口頭にていくつかのセッションを取り上げて紹介させていただきました 早くみんなで同時視聴して 感想戦 したいので、 アーカイブ が公開されることを待ち望んでいます
アバター
はじめに 弊社では Entra ID ユーザ / グループを使い AWS 利用時の認証や権限制御を IAM Identity Center を使い実現しています。Entra ID と IAM Identity Center を SCIM で連携させることで Entra ID 側の情報を用いて達成しており、このあたりは 拙稿 に詳細があります。 IAM Identity Center は自身が ID プロバイダ(以下 IdP と書きます)になることもでき、この場合 SAML / OAuth2 で外部アプリケーションとやりとりすることが可能です。このあたりも 拙稿 として存在します。 今回、社内のちょっとした要件で SPA(single page application の SPA です。本稿題名含め以下でも同様)を作成する事情があり、これに対しアクセスが可能なメンバを社内でも絞っておきたい需要があったので、このあたりの制御を IAM Identity Center で行わせるようにしてみました。ただし SPA に直接 IAM Identity Center とやりとりさせるのは骨が折れるので、何らかの仲立ちが欲しいところです。そこで Cognito を使い、以下を実現しました。 SPA へのアクセス時に Cognito user pool + IAM Identity Center による認証を行い、アクセスしたユーザが SPA を使う上で妥当な権限を持っているか認証 この時の「認証」の材料には Entra ID ユーザ / グループを使う SPA 用にユーザが払い出されるわけではなく、既に Entra ID 上に有るユーザを使用した SSO という認証体験になる Cognito identity pool + IAM ロールによって SPA が AWS 上で実行可能なアクションを制御 これにより、SPA 動作にあたり 認証の為のバックエンドを特に設ける事なく 、また IAM Identity Center を IdP とすることで Entra ID 側には触れずに Entra ID ユーザ / グループを用いた認証が可能 という構成をとることができました。このあたりを tips として紹介できればと思います。 構成 はじめに今回整備した諸々の全容を示します。 SPA の中身には立ち入りません。 AWS の適当なサービスを触る必要がある(= IAM クレデンシャルが必要)ものと理解ください。 SPA 自体はシンプルで、S3 バケット に配置したものを CloudFront で配信しているのみになります。CloudFront を経由しないアクセスを防ぐよう origin access control による制限 *1 を加えています。 認証に関しては Cognito の user pool *2 に依拠しています。以下要領です。 user pool の IdP として IAM Identity Center を指定 SPA からは Cognito を隠蔽せず、アクセスされた際に Cognito が用意するログインページへリダイレクトし、Cognito での認証が完了したのちに SPA へ戻ってくるように構成 Cognito による認証(= IdP での認証)が成功しアクセス トーク ンが返ってきた場合は SPA は正規のコンテンツを、失敗した場合はアクセス拒否時のコンテンツを返却 認証が完了したのち、得られた トーク ンと identity pool *3 とを使い、クレデンシャルを得、SPA は得られたクレデンシャルをもとに AWS API を叩きます。このとき identity pool には SPA に執り行わせたい IAM アクションを許可した IAM ロールを authenticated role として設定しておき、払い出されたクレデンシャルの効力範囲を適当に制限しておく構想としました。 ポイント 上述した内容、特に Cognito の user pool / identity pool はドキュメントに従った素直な使い方につき、特段の補足は必要無いと思われます。一方で Cognito と IAM Identity Center とを連携させる方面については少々難儀しました。この周辺について説明します。 IAM Identity Center を Cognito の IdP としてどのように設定するか Cognito の user pool では特定のベンダに依存しない IdP 連携方法として SAML か OpenID Connect が選択できます *4 。IAM Identity Center ではこの種の連携を行う際にアプリケーションを用意し設定するのですが *5 、ここでの選択肢には SAML か OAuth 2.0 かのみです *6 。 共通要素としては SAML になり、もちろんこれでうまくいくので、 Cognito で使う IdP として IAM Identity Center を使う場合は SAML で連携させればよい と承知しておいて頂ければ、本稿の内容は8割カバーできます。 アプリケーションの作成に関しては AWS 公式ドキュメントを読むのが手っ取り早いです。以下が該当します。 docs.aws.amazon.com アプリケーション作成後に設定する各種設定に関しては以下が詳しいです。 repost.aws そもそも「認証」をどのようにするか ちょっと大袈裟な節名で自分でも困惑していますが、要は「Cognito が IAM Identity Center の結果を認証結果とするなら IAM Identity Center は何をもって認証するの?」ということです。 これは単純に今回用意した IAM Identity Center アプリケーションに紐付けされているユーザ / グループであれば OK という整理にしてあります。つまりは Redash の SSO ログインに関する拙稿 の整理と同様に これには IAM Identity Center 内でアプリケーションというものを用意し、アクセスさせたいユーザ / グループをアプリケーションに紐付けることで達成 という方針としました。Cognito から IAM Identity Center に処理が遷移した際、 SAML 連携に使用している IAM Identity Center アプリケーションに紐付けされているユーザであればアプリケーションはそのまま処理を通してくれるので、Cognito 側では特に何も考えず、連携だけを気にしておけばよくなり、話が単純になります。 繰り返しになりますが、弊社は IAM Identity Center を Entra ID と連携させ、IAM Identity Center で使えるユーザ / グループは Entra ID のそれを引き継いでいます。 よって Entra ID 側の情報を前提に「認証」に必要な条件を構成することで、IAM Identity Center でもそれを引き継いで構成することが可能なようになっています。この際 Entra ID には一切触れないで済ませることが可能です。 コード例 おまたせしました。コード例を示します。 今回は Terraform コードに加え、 あくまで参考として SPA コードのうち Cognito を取り扱う箇所を抜粋で掲載します。SPA は Vue.js を使い作っており *7 。記法のお作法(主に 環境変数 周辺)は Vite のそれに従ったものになります。 Terraform 前述の アクセスさせたいユーザ / グループをアプリケーションに紐付ける の方針をグループ単位で実施させる前提のコードです IAM Identity Center 側 こちらをクリックして参照のこと locals { entra_id_groups = { spa = [ "test" # SPA 利用を許可したい Entra ID グループ名を指定 ] } } /* IAM Identity Center インスタンスに対し必要な設定が行われる これは Terraform リソースでの管理が難しいので、設定は AWS マネジメントコンソール上から行い、Terraform からは参照に留める */ data "aws_ssoadmin_instances" "main" {} resource "aws_ssoadmin_application" "spa" { name = "SPA" description = "SAML application for SPA" application_provider_arn = "arn:aws:sso::aws:applicationProvider/custom-saml" instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] portal_options { visibility = "ENABLED" sign_in_options { origin = "IDENTITY_CENTER" } } } data "aws_identitystore_group" "spa" { for_each = toset (local.entra_id_groups.spa) identity_store_id = tolist (data.aws_ssoadmin_instances.main.identity_store_ids) [ 0 ] alternate_identifier { unique_attribute { attribute_path = "DisplayName" attribute_value = each.key } } } resource "aws_ssoadmin_application_assignment" "spa" { for_each = toset (local.entra_id_groups.spa) application_arn = aws_ssoadmin_application.spa.arn principal_id = data.aws_identitystore_group.spa [ each.key ] .group_id principal_type = "GROUP" } Cognito ほか SPA 側で必要とする側 こちらをクリックして参照のこと data "aws_s3_object" "saml_metadata" { /* コード管理していない。手でアップロードすること IAM Identity Center アプリケーションの "IAM Identity Center SAML metadata file" から取得する */ bucket = aws_s3_bucket.static_files.id key = "saml-metadata/spa.xml" } resource "aws_iam_saml_provider" "spa" { name = "spa" saml_metadata_document = data.aws_s3_object.saml_metadata.body } resource "aws_cognito_identity_pool" "spa" { identity_pool_name = "spa" allow_unauthenticated_identities = false # 認証されていないIDを無効化 saml_provider_arns = [ aws_iam_saml_provider.spa.arn ] cognito_identity_providers { client_id = aws_cognito_user_pool_client.spa.id provider_name = "cognito-idp.$ { data.aws_region.current.region } .amazonaws.com/$ { aws_cognito_user_pool.spa.id } " server_side_token_check = false } } data "aws_iam_policy_document" "assume_from_cognito" { statement { effect = "Allow" principals { type = "Federated" identifiers = [ "cognito-identity.amazonaws.com" ] } actions = [ "sts:AssumeRoleWithWebIdentity" ] condition { test = "StringEquals" variable = "cognito-identity.amazonaws.com:aud" values = [ aws_cognito_identity_pool.spa.id, ] } condition { test = "ForAnyValue:StringLike" variable = "cognito-identity.amazonaws.com:amr" values = [ "authenticated" ] } } } /* Cognito Identity Pool と連携して一時認証情報を取得するための IAM ロール このロールを Cognito Identity Pool の Authenticated Role として設定 */ resource "aws_iam_role" "allow_actions_for_spa" { name = "allow-actions-for-spa" assume_role_policy = data.aws_iam_policy_document.assume_from_cognito.json } data "aws_iam_policy_document" "allow_actions_for_spa" { statement { # 略 } } # データバケットへの読み取り専用ポリシーをアタッチ resource "aws_iam_role_policy" "allow_actions_for_spa" { name = "allow-actions-for-spa" role = aws_iam_role.allow_actions_for_spa.id policy = data.aws_iam_policy_document.allow_actions_for_spa.json } # 認証されたユーザーにデフォルトロールを割り当てる resource "aws_cognito_identity_pool_roles_attachment" "spa" { identity_pool_id = aws_cognito_identity_pool.spa.id roles = { "authenticated" = aws_iam_role.allowed_actions_for_spa.arn } } resource "aws_cognito_user_pool" "spa" { name = "spa" } resource "aws_cognito_user_pool_domain" "spa" { user_pool_id = aws_cognito_user_pool.spa.id domain = "test" # 適宜変更する } # これは SPA かどうかに関係なく使えるはずなので "main" に改めておく resource "aws_cognito_identity_provider" "main" { user_pool_id = aws_cognito_user_pool.spa.id provider_name = "iam-identity-center" provider_type = "SAML" provider_details = { "MetadataFile" = data.aws_s3_object.saml_metadata.body } attribute_mapping = { "email" = "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress" } lifecycle { ignore_changes = [ provider_details, # MetadataFile を解釈して設定される値が延々差分を生じさせるので ignore する ] } } # Cognito User Pool のクライアント(SPAからログインするための設定) resource "aws_cognito_user_pool_client" "spa" { name = "SPA" user_pool_id = aws_cognito_user_pool.spa.id explicit_auth_flows = [ "ALLOW_USER_AUTH" , "ALLOW_CUSTOM_AUTH" , "ALLOW_USER_SRP_AUTH" , "ALLOW_REFRESH_TOKEN_AUTH" , ] allowed_oauth_flows_user_pool_client = true allowed_oauth_scopes = [ "openid" , "email" , ] allowed_oauth_flows = [ "code" ] supported_identity_providers = [ aws_cognito_identity_provider.main.provider_name, ] token_validity_units { access_token = "minutes" id_token = "minutes" refresh_token = "days" } callback_urls = [ "https://$ { var.spa_domain } /path/to/app" ] # 適宜変更する default_redirect_uri = "https://$ { var.spa_domain } /path/to/app" # 適宜変更する } resource "local_file" "spa" { content = <<EOT VITE_AWS_REGION=$ { data.aws_region.current.region } VITE_USER_POOL_ID=$ { aws_cognito_user_pool.spa.id } VITE_USER_POOL_CLIENT_ID=$ { aws_cognito_user_pool_client.spa.id } VITE_IDENTITY_POOL_ID=$ { aws_cognito_identity_pool.spa.id } VITE_COGNITO_DOMAIN=$ { aws_cognito_user_pool_domain.spa.id } .auth.$ { data.aws_region.current.region } .amazoncognito.com VITE_REDIRECT_URI=https://$ { var.spa_domain } /path/to/spa EOT filename = "./path/to/spa/.env" # 適宜変更する file_permission = "0644" } おまけとして S3 および CloudFront に関するコードも掲載しておきます こちらをクリックして参照のこと resource "aws_s3_bucket" "spa" { bucket = "test" # 適宜変更する } # CloudFront からのアクセスを許可する S3 バケットポリシーを定義 data "aws_iam_policy_document" "spa" { statement { effect = "Allow" principals { type = "Service" identifiers = [ "cloudfront.amazonaws.com" , ] } actions = [ "s3:GetObject" , ] resources = [ "$ { aws_s3_bucket.spa.arn } /*" , ] condition { test = "StringEquals" variable = "AWS:SourceArn" values = [ aws_cloudfront_distribution.spa.arn, ] } } } /* OAC (Origin Access Control) を使った CloudFront 経由のアクセスを許可するバケットポリシー このポリシーにより、S3バケットへの直接アクセスが拒否される */ resource "aws_s3_bucket_policy" "spa" { bucket = aws_s3_bucket.spa.id policy = data.aws_iam_policy_document.spa.json } resource "aws_cloudfront_origin_access_control" "spa" { name = "oac-for-spa" description = "OAC for SPA S3 bucket" origin_access_control_origin_type = "s3" signing_behavior = "always" signing_protocol = "sigv4" } data "aws_cloudfront_cache_policy" "caching_optimized" { name = "Managed-CachingOptimized" } data "aws_cloudfront_cache_policy" "caching_disabled" { name = "Managed-CachingDisabled" } resource "aws_acm_certificate" "spa" { provider = aws.us-east-1 domain_name = "*.test.example" # 適宜変更する validation_method = "DNS" lifecycle { create_before_destroy = true } } resource "aws_acm_certificate_validation" "spa" { provider = aws.us-east-1 certificate_arn = aws_acm_certificate.spa.arn validation_record_fqdns = [ for record in aws_route53_record.spa_cert_validation : record.fqdn ] } resource "aws_cloudfront_distribution" "spa" { enabled = true tags = { Name = "spa" } aliases = [ var.spa_domain, ] origin { domain_name = aws_s3_bucket.spa.bucket_regional_domain_name origin_id = aws_s3_bucket.spa.id origin_access_control_id = aws_cloudfront_origin_access_control.spa.id } default_cache_behavior { allowed_methods = [ "GET" , "HEAD" , "OPTIONS" ] cached_methods = [ "GET" , "HEAD" ] target_origin_id = aws_s3_bucket.spa.id viewer_protocol_policy = "redirect-to-https" # 今回の SPA では Cookie やヘッダーを考慮しないので転送もしない cache_policy_id = data.aws_cloudfront_cache_policy.caching_optimized.id function_association { event_type = "viewer-request" function_arn = aws_cloudfront_function.spa.arn } } # JavaScriptファイルを処理 ordered_cache_behavior { path_pattern = "path/to/app/assets/*.js" allowed_methods = [ "GET" , "HEAD" , "OPTIONS" ] cached_methods = [ "GET" , "HEAD" ] target_origin_id = aws_s3_bucket.spa.id viewer_protocol_policy = "redirect-to-https" cache_policy_id = data.aws_cloudfront_cache_policy.caching_optimized.id } # SPA の HTML ファイルや画像、その他アセットを処理 ordered_cache_behavior { path_pattern = "path/to/app/*" allowed_methods = [ "GET" , "HEAD" , "OPTIONS" ] cached_methods = [ "GET" , "HEAD" ] target_origin_id = aws_s3_bucket.spa.id viewer_protocol_policy = "redirect-to-https" cache_policy_id = data.aws_cloudfront_cache_policy.caching_disabled.id } restrictions { geo_restriction { restriction_type = "none" } } viewer_certificate { acm_certificate_arn = aws_acm_certificate.spa.arn ssl_support_method = "sni-only" } } # SPAのパスを書き換える CloudFront Function resource "aws_cloudfront_function" "spa" { name = "spa-router" runtime = "cloudfront-js-1.0" comment = "Rewrites SPA paths to index.html" publish = true code = <<EOT function handler(event) { var request = event.request; var uri = request.uri; // ファイル拡張子を持つパスはスキップする(例: .js, .css, .png) if (uri. split ('.'). pop () !== uri) { return request; } // リクエストパスが特定のSPAプレフィックスで始まるかチェック if (uri.startsW ith ('/path/to/app')) { request . uri = '/path/to/app/index.html'; } return request; } EOT } SPA コード 認証部分 const CLIENT_ID = import . meta . env . VITE_USER_POOL_CLIENT_ID ; const COGNITO_DOMAIN = import . meta . env . VITE_COGNITO_DOMAIN ; # Cognito の user pool に対し設定できるドメイン(hosted UI アクセス時のドメインとして見える)の ID。環境変数経由で渡す const REDIRECT_URI = import . meta . env . VITE_REDIRECT_URI ; # SPA の URL。Cognito で認証が成功した場合に戻ってくるため必要。環境変数経由で渡す export function redirectToHostedUI () { const url = new URL ( `https:// ${ COGNITO_DOMAIN } /login` ) ; url . searchParams . set ( "client_id" , CLIENT_ID ) ; url . searchParams . set ( "response_type" , "code" ) ; url . searchParams . set ( "scope" , "openid email" ) ; url . searchParams . set ( "redirect_uri" , REDIRECT_URI ) ; window . location . href = url . toString () ; } export async function handleCognitoRedirect () { const params = new URLSearchParams ( window . location . search ) ; const code = params . get ( "code" ) ; if ( ! code ) return sessionStorage . getItem ( "id_token" ) ; // すでに保持している場合 const body = new URLSearchParams ({ grant_type : "authorization_code" , client_id : CLIENT_ID , redirect_uri : REDIRECT_URI , code , }) ; const tokenResp = await fetch ( `https:// ${ COGNITO_DOMAIN } /oauth2/token` , { method : "POST" , headers : { "Content-Type" : "application/x-www-form-urlencoded" } , body : body . toString () , }) ; if ( ! tokenResp . ok ) throw new Error ( "Failed to fetch tokens" ) ; const tokens = await tokenResp . json () ; sessionStorage . setItem ( "id_token" , tokens . id_token ) ; return tokens . id_token ; } クレデンシャル取得部分 import { S3Client } from "@aws-sdk/client-s3" ; # S3 を触る場合の例として記載 import { CognitoIdentityClient , GetIdCommand , GetCredentialsForIdentityCommand } from "@aws-sdk/client-cognito-identity" ; const REGION = import . meta . env . VITE_AWS_REGION ; # AWS リージョン。Cognito リソースが存在する場所。環境変数経由で渡す const USER_POOL_ID = import . meta . env . VITE_USER_POOL_ID ; const IDENTITY_POOL_ID = import . meta . env . VITE_IDENTITY_POOL_ID ; export async function getS3Client () { const idToken = localStorage . getItem ( "id_token" ) ; if ( ! idToken ) throw new Error ( "User not authenticated" ) ; const client = new CognitoIdentityClient ({ region : REGION }) ; const identityResp = await client . send ( new GetIdCommand ({ IdentityPoolId : IDENTITY_POOL_ID , Logins : { [ `cognito-idp. ${ REGION } .amazonaws.com/ ${ USER_POOL_ID } ` ] : idToken , } , }) ) ; const credResp = await client . send ( new GetCredentialsForIdentityCommand ({ IdentityId : identityResp . IdentityId , Logins : { [ `cognito-idp. ${ REGION } .amazonaws.com/ ${ USER_POOL_ID } ` ] : idToken , } , }) ) ; return new S3Client ({ region : REGION , credentials : { accessKeyId : credResp . Credentials . AccessKeyId , secretAccessKey : credResp . Credentials . SecretKey , sessionToken : credResp . Credentials . SessionToken , expiration : credResp . Credentials . Expiration , } , }) ; } おわりに Cognito から IAM Identity Center を IdP として参照し、これを使用して SPA へのアクセス制御を実施する例について解説しました。 S3 + CloudFront で配信する諸々のアクセス制御として真っ先に浮かんでしまうのが私的にはアクセス元 IP アドレスによる境界制限( AWS WAF v2 などを使う想定)なのですが、IAM Identity Center に依拠したユーザ認証という選択肢が今回割合気軽に使え、なかなか現代的なアクセス制御ができるようになったと思っています。 セキュリティ的な方面から考えても 境界防御ではなく認証による制限ができる 既存の IdP の構成を使用して認証の設計ができ、ID 運用の分散が防げる 既に IdP 上に存在するユーザを使っての SSO という挙動になり利用者の体験をあまり損わない といったメリットがあり、見通しのよい構成となりました。 IAM Identity Center / Cognito l活用の一助として本稿が参考になれば幸いです。 MNTSQ 株式会社 SRE 秋本 参考 IAM Identity Center と Cognito を統合してみた 先行事例です *1 : https://docs.aws.amazon.com/ja_jp/AmazonCloudFront/latest/DeveloperGuide/private-content-restricting-access-to-s3.html *2 : https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-pools.html *3 : https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-identity.html *4 : https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-pools-identity-federation.html?icmpid=docs_cognito_console_help_panel *5 : https://docs.aws.amazon.com/singlesignon/latest/userguide/manage-your-applications.html *6 : https://docs.aws.amazon.com/singlesignon/latest/userguide/customermanagedapps-saml2-oauth2.html *7 : 特に選定に理由はなく、とりあえず動くものとサクッと作りたかった程度です
アバター
はじめに 弊社では BI ツールとして Redash を EC2 上でセルフホストするかたちで利用しています。CS や Sales 等の部門で日々の指標を追うのに使われるだけでなく、開発や運用のためのアラーティングにも活用されています。 今回この Redash へ SSO ログインの仕組みを導入し、Redash 利用者の操作履歴を 部分的に 監査できるようにしました。SSO の IdP としては掲題通り IAM Identity Center を選定しています。これら対応について記録します。 なぜ IAM Identity Center を IdP に? 去る2025年5月下旬に以下記事を公開しました。 この中で ユーザから見た場合に各 AWS アカウントへのアクセスが透過的になるのも IAM Identity Center 導入での嬉しさのひとつですが、IAM Identity Center 自体がアイデンティプロバイダ (IdP) としての利用も可能であるという点から、SRE 主体での他のサービスへの SSO 化の試みについても着手し易くなってきました。今後は社内で独自のユーザ管理体系が存在する諸々のサービスを着実に SSO 化し、利用者の利便性向上やセキュアな体制構築に繋げてゆこうと計画しています。 という一節があります。弊社では Redash を SRE と CRE とが連携して運用しており、SSO 化の取り組みは SRE の責任範囲にて実施することとしました。弊社 CRE については以下もぜひご参照ください。 note.mntsq.co.jp さて前掲の IAM Identity Center に関する記事は Entra ID を IdP として IAM Identity Center から AWS への SSO ログインを可能にするという趣旨になります。つまり IAM Identity Center が抱え込んでいるユーザ / グループの元ネタは Entra ID となります。 ここで IAM Identity Center を IdP として Redash へ SSO するよう構成することで、 Entra ID 側での対応をすることなく Entra ID ユーザ / グループを活用して Redash へのログインが可能になります。 これは Redash を利用する立場にしてみると Entra ID ユーザを使って Redash が使えるという体験につながり、弊社が普段の業務で使う他の SaaS と同様の利用体験が得られる格好になります。サービス利用の煩雑さが低減できるという嬉しさがあるでしょう。 構成 SSO ログイン化前後での構成変遷を示します。なお Redash のためのインフラは EC2 + ALB で構成され、Redash そのものは EC2 インスタンス 内で docker-compose によって動かしています。公式ドキュメントに沿った方法 *1 といえるでしょう。 SSO ログイン前 至極シンプルです。パスワードマネージャ(弊社では全社的に Keeper *2 を使用しています)上の Redash ユーザ名およびパスワードを必要に応じて利用しログインする方式です。 SSO ログイン後 相応に複雑になりました。Redash 以外にも登場人物が増えています。 IAM Identity Center に Entra ID から連携されたユーザを使い、Redash へログインします Redash が出力するログを EC2 から CloudWatch Logs に出力し、そこから Data Firehose を経由し、最終的には S3 に配置します IAM Identity Center 経由での Redash へのログイン履歴を CloudTrail ログ(これも最終的には S3 に配置されます)から追跡します S3 上に配置された各種ログは必要に応じて Athena から検索が可能です SSO 設定 おおむね以下のような順番での作業が必要になります: IAM Identity Center アプリケーションの用意 IAM Identity Center アプリケーションからの必要な情報の取得と設定 Redash 上での SAML 設定 IAM Identity Center アプリケーションの用意 前述のとおり、IAM Identity Center は外部 IdP を引き受けて AWS アカウントへのアクセスを可能とする利用形態以外に、自身が IdP となり AWS サービスや外部サービスへのアクセスを担うこともできます。 これには IAM Identity Center 内でアプリケーションというものを用意し、アクセスさせたいユーザ / グループをアプリケーションに紐付けることで達成可能です *3 。 AWS サービスと連携させる場合とそれ以外とでそれぞれ対応が異なり、今回のように Redash が対象の場合は利用者自身が管理する (customer managed) アプリケーションで構成するとよいでしょう *4 。 Redash 公式ドキュメントのうち SSO に関するもの *5 を読むと、今回使うべきは SAML であることがわかります。したがって IAM Identity Center も SAML を前提としたアプリケーションを用意します。 サクッと Terraform で作ってしまいましょう。コード例は以下の通りです。 こちらを参照 locals { entra_id_groups = { /* Redash へのアクセスを許可したいユーザが属する Entra ID グループ名を指定 Entra ID グループ / ユーザが IAM Identity Center へ SCIM で連携などされていることを前提にする */ redash = [ "Engineer" , "CS" , "Sales" , ... ] } } /* IAM Identity Center インスタンスに対し必要な設定が行われる これは Terraform リソースでの管理が難しいので、設定は AWS マネジメントコンソール上から行い、Terraform からは参照に留める */ data "aws_ssoadmin_instances" "main" {} # IAM Identity Center アプリケーション定義。SAML を前提とするもの resource "aws_ssoadmin_application" "redash" { name = "Redash" description = "IAM Identity Center appliction to access Redash" application_provider_arn = "arn:aws:sso::aws:applicationProvider/custom-saml" instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] portal_options { visibility = "ENABLED" sign_in_options { origin = "IDENTITY_CENTER" } } } # IAM Identity Center アプリケーションの利用を許可する Entra ID グループ(が IAM Identity Center に同期されたもの)を参照できるようにする data "aws_identitystore_group" "redash" { for_each = toset (local.entra_id_groups.redash) identity_store_id = tolist (data.aws_ssoadmin_instances.main.identity_store_ids) [ 0 ] alternate_identifier { unique_attribute { attribute_path = "DisplayName" attribute_value = each.key } } } # アプリケーション / Entra ID グループ間の紐付けをする resource "aws_ssoadmin_application_assignment" "redash" { for_each = toset (local.entra_id_groups.redash) application_arn = aws_ssoadmin_application.redash.arn principal_id = data.aws_identitystore_group.redash [ each.key ] .group_id principal_type = "GROUP" } IAM Identity Center アプリケーションからの必要な情報の取得と設定 ここまでで IAM Identity Center アプリケーションの用意はできました。しかし残念ながら SAML 設定そのものは Terraform では設定できません 。公式ドキュメントを参照しつつ、適宜設定する必要があります。以下を見るとよいでしょう。 docs.aws.amazon.com 明示的な設定が必要になるのは Application metadata における以下内容です。必要な値は Redash 公式ドキュメント *6 をもとに設定します。 なお Application metadata と同じ画面にある IAM Identity Center metadata はその内容を控えておきます。Redash 側の設定で使います。IAM Identity Center としてはこのあたりの設定をまとめた XML ファイルの取得が可能なのですが、生憎 Redash にはこの XML ファイルを読み取れる仕組みがありません。よって逐次設定を行う必要があります。 重要:attribute mapping の設定 ここまで済めば一段落……と思いきや、前述の AWS 公式ドキュメントには以下の一節があります。 However, you must also provide additional SAML attribute mappings for your own SAML 2.0 applications. These mappings enable IAM Identity Center to populate the SAML 2.0 assertion correctly for your application. You can provide this additional SAML attribute mapping when you set up the application for the first time. You can also provide SAML 2.0 attribute mappings on the application details page in the IAM Identity Center console. このあたりも Terraform では直接の設定が不可能です。 AWS マネジメントコンソールから設定してしまうのが手っ取り早いでしょう。 設定した IAM Identity Center アプリケーションの詳細画面から Actions -> Edit attribute mappings と辿り、以下の値を設定します。 User attribute in the application Maps to this string value or user attribute in IAM Identity Center Format Subject(変更不可) ${user:email} emailAddress FirstName ${user:givenName} basic LastName ${user:familyName} basic Redash 上での SAML 設定 基本的には Redash の公式ドキュメントに従えば設定が済みます *7 。Admin グループに属する Redash ユーザでないと設定画面が出ないので注意が必要です。実際に設定が必要な内容は以下の通りです。 SAML Enabled :チェックをいれる SAML Metadata URL :IAM Identity Center 側 IAM Identity Center metadata 内のうち “IAM Identity Center SAML metadata file” で示される URL を記入 SAML Entity ID :こだわりがなければ Redash IAM Identity Center 側 Application metadata 内 “Application SAML audience” と一致していればよい SAML NameID Format : urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress 固定値 ログ監査 ここまでで Redash へ IAM Identity Center を IdP として SSO アクセスする手筈が整いました。折角 IAM Identity Center を経由しての操作となるので、CloudTrail あたりから操作ログを棚卸したくなるのが人情というものでしょう。このあたりの整備をします。 ただし本稿冒頭で Redash 利用者の操作履歴を 部分的に 監査できるようにしました と記載した通り、Redash へのログインからログアウトまでの全操作をユーザにひもづくかたちで追跡できるような状況には出来ていません。ユーザを絞れる状態で追えるログは以下までとなります。 IAM Identity Center (CloudTrail) ログで追跡可 Redash にログインした Redash ログを駆使して追跡可 クエリを実行した(実行したクエリまで追跡化) 以下は上記ログでは追跡不可な様子でした。無論ログ自体はあるのですが、ユーザにひもづく形でのログは存在せず、他の情報と突き合わせて追跡する必要があります。 クエリ実行結果を見た ダッシュ ボードを見た 追跡可能な範囲でログをどのように追えるかを示せればと思います。 IAM Identity Center (CloudTrail) ログから Redash ログイン形跡を追う いきなり結論です。以下のような Athena クエリが使えます。直近7日間のログを追跡する例です select userIdentity.onBehalfOf.userId as userId, json_extract_scalar(serviceEventDetails, ' $["app-id"] ' ) as appId, eventtime from <CloudTrail ログ用 Athena DB> where timestamp between date_format( current_date - interval ' 7 ' day, ' %Y/%m/%d ' ) and date_format( current_date , ' %Y/%m/%d ' ) and eventSource = ' sso.amazonaws.com ' and eventName = ' Federate ' and json_extract_scalar(serviceEventDetails, ' $["app-id"] ' ) = ' <Redash への SAML ログインで使う IAM Identity Center アプリケーション ID> ' 日本語で記載してある箇所について解説します。 CloudTrail ログ用 Athena DB :S3 上にある CloudTrail ログを取り扱う為の Athena DB。 パーティション 射影を使う AWS 公式ドキュメントに沿った内容を前提とする *8 Redash への SAML ログインで使う IAM Identity Center アプリケーション ID :IAM Identity Center アプリケーション に対応する インスタンス の ID 2. については解説が必要でしょう。ここで指定すべき ID は ARN に含まれる ID (ex. api-xxx ) ではなく、 URL 中に示される ID (ex. ins-xxx ) のほうです。伏字が多い例で恐縮ですが、以下 スクリーンショット 中の赤枠の箇所を使って始めて上記クエリで所望のログが得られます。 このあたりについては AWS 公式のドキュメントを含めても公開されている情報がなく、弊社を担当頂いております AWS 社の SA の方々に多大なるご協力を頂き実現したものとなります。この場を借りて感謝申し上げます。なおこの内容は今後変化する可能性が多分にある模様で、その際にログの取り方を変えられるようにしておく必要があります。 弊社では上記 Athena クエリを名前付きクエリとして整備したうえで専用の workgroup を用意し、適当な期間でのログ検索とその結果のレポーティングの仕組みを運用するようにしています。これは今回の Redash ログ以外でも同様の仕組みでレポーティングを行っているものがあるので、それらをまとめて別の機会にブログ化できればと思います。少しだけネタばらしをすると以下のような仕組みです。 Lambda で Athena クエリを実行 Athena がクエリの実行を終える Athena workgroup にひもづくかたちでクエリ実行完了イベントが EventBridge から発火する 3. のイベントを拾う Lambda 関数が Athena クエリ実行結果を取得し、必要な処理を実行 最終的な結果を Lambda -> SNS -> AWS Chatbot -> Slack という流れで通知 Redash ログからクエリ実行形跡を追う 弊社のように EC2 インスタンス 上で Redash をセルフホストする場合、当然ながらそのログは EC2 インスタンス 上に存在することになります。 これではログを継続的に追う場合に都合が悪いので、EC2 上からいったん CloudWatch Logs ロググループにログを出し、そのうえで Data Firehose を経由し S3 にログを出力し、Athena で追跡が可能なようにしています。 EC2 から CloudWatch Logs へログを出すにはいくつか方法がありますが、幸いなことに弊社での Redash の動かし方は docker-compose によるのものなため、以下を利用できます。 docs.docker.com これを使い logging : driver : awslogs options : awslogs-region : ap-northeast-1 awslogs-group : <CloudWatch ロググループ名> awslogs-create-group : "false" といった記述を Redash コンテナ管理用の docker-compose.yml へ追記し、EC2 インスタンス プロファイルへ CloudWatch Logs 関係の必要なアクションを許可してやれば、EC2 から CloudWatch Logs へのログ出力については解決します。具体的には以下アクションが認可できていれば OK です。 logs:CreateLogStream logs:PutLogEvents Redash の典型的なクエリ実行ログは Redash サーバにおける [2025-08-08 02:11:13,060][PID:31][INFO][root] Inserting job for 915b2acd6d435dfd3ea3578a457f4ded with metadata={'Username': u'foo.bar@example.com', 'Query ID': u'1234'} なので、これだけが Athena で取り扱えればひとまずは充分という指針のもと、以下のような仕組みを入れています。 Redash サーバが CloudWatch Logs へログを全て出力する CloudWatch Logs は log subscription filter を介して Data Firehose へログを送出する この際に必要なログだけを Data Firehose へ送出するようフィルタ設定を行う Data Firehose は最終目的地の S3 バケット へログを送出する Terraform コードとしては以下のようになります。EC2 から送られてくるログを一旦集める CloudWatch Logs ロググループの用意から S3 バケット までを担当しています。 こちらを参照 data "aws_caller_identity" "current" {} /* Redash ログ保管用 EC2 内で docker-compose にて動かしているコンテナ群からログを飛ばす想定 */ resource "aws_cloudwatch_log_group" "redash_log" { name = "/$ { var.env } -redash" retention_in_days = 7 # ほとんど実用に堪えない内容なので長く保持しておく必要なし } resource "aws_cloudwatch_log_subscription_filter" "redash_log" { name = "$ { var.env } -redash-log-to-data-firehose" log_group_name = aws_cloudwatch_log_group.redash_log.name /* クエリ実行ログでしか Redash 操作者を絞れない このログは "Inserting job" を含むものになるので、こいつだけを S3 へ送出するログとする */ filter_pattern = "Inserting job" destination_arn = aws_kinesis_firehose_delivery_stream.redash_log.arn role_arn = aws_iam_role.redash_log_to_data_firehose.arn } # Redash ログを Athena で取り扱えるように S3 へ保存したい。そのためのバケット resource "aws_s3_bucket" "redash_log" { bucket = "$ { var.env } -redash-log" } # 半年程度はログを確保できておくようにする。監査目的であればもっと長く取っておくべきかも resource "aws_s3_bucket_lifecycle_configuration" "redash_log" { bucket = aws_s3_bucket.redash_log.id rule { status = "Enabled" id = "delete after 180 days" expiration { days = 180 } filter { prefix = "" } } } resource "aws_s3_bucket_versioning" "redash_log" { bucket = aws_s3_bucket.redash_log.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "redash_log" { bucket = aws_s3_bucket.redash_log.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_server_side_encryption_configuration" "redash_log" { bucket = aws_s3_bucket.redash_log.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } data "aws_iam_policy_document" "redash_log_bucket_policy" { statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.redash_log.arn } /*" ] principals { type = "Service" identifiers = [ "logging.s3.amazonaws.com" ] } condition { test = "StringEquals" variable = "aws:SourceAccount" values = [ data.aws_caller_identity.current.account_id ] } } } resource "aws_s3_bucket_policy" "redash_log" { bucket = aws_s3_bucket.redash_log.bucket policy = data.aws_iam_policy_document.redash_log_bucket_policy.json } /* CloudWatch Logs から S3 へログを保存するための Data Firehose リソース CW Logs 側 subscription filter を経由して流れてくるデータを扱う */ resource "aws_kinesis_firehose_delivery_stream" "redash_log" { name = "$ { var.env } -redash-log" destination = "extended_s3" extended_s3_configuration { role_arn = aws_iam_role.redash_log_to_s3.arn bucket_arn = aws_s3_bucket.redash_log.arn buffering_size = 64 # MB 単位。dynamic partitioning が有効の場合必須 dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{prefix: {dummy: (\"logs\")} | .dummy}" # 実質的に "logs" という固定文字列を返すだけ } } processors { type = "Decompression" parameters { parameter_name = "CompressionFormat" parameter_value = "GZIP" } } processors { type = "AppendDelimiterToRecord" } } prefix = "!{partitionKeyFromQuery:prefix}/$ { data.aws_caller_identity.current.account_id } /!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/" error_output_prefix = "error/!{firehose:error-output-type}/$ { data.aws_caller_identity.current.account_id } /!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/" cloudwatch_logging_options { enabled = true log_group_name = aws_cloudwatch_log_group.redash_log.name log_stream_name = "S3Delivery" } } } Athena テーブル / ビューの例は以下のようになります。Redash ログは少々やんちゃな内容となる場合があり、冗長な表記になっています。 CREATE TABLE 文 CREATE EXTERNAL TABLE " redash_server_log_source " ( messageType string, owner string, logGroup string, logStream string, subscriptionFilters array<string>, logEvents array<struct<id:string, timestamp :bigint, message:string>> ) PARTITIONED BY ( ` timestamp ` string ) ROW FORMAT SERDE ' org.openx.data.jsonserde.JsonSerDe ' LOCATION ' s3://mntsq-ops-redash-log/logs/465191204183/ ' TBLPROPERTIES ( ' projection.enabled ' = ' true ' , ' projection.timestamp.type ' = ' date ' , ' projection.timestamp.format ' = ' yyyy/MM/dd ' , ' projection.timestamp.range ' = ' 2025/01/01,NOW ' , ' projection.timestamp.interval ' = ' 1 ' , ' projection.timestamp.interval.unit ' = ' DAYS ' , ' storage.location.template ' = ' s3://mntsq-ops-redash-log/logs/465191204183/${timestamp}/ ' ); CREATE VIEW 文 CREATE OR REPLACE VIEW redash_server_log AS WITH unnested_logs AS ( SELECT " day " , l.id AS event_id, from_unixtime(l. " timestamp " / 1000 ) AS timestamp , l.message FROM " redash_server_log_source " CROSS JOIN UNNEST(logEvents) AS t (l) ), parsed_logs AS ( SELECT *, -- TRY関数を追加 TRY(regexp_extract(message, ' ([ -:,0-9]+) ' , 1 )) AS log_timestamp, TRY(regexp_extract(message, ' \[PID:([0-9]+)\] ' , 1 )) AS pid, TRY(regexp_extract(message, ' \[([A-Z]+)\] ' , 1 )) AS log_level, TRY(regexp_extract(message, ' Inserting job for ([a-f0-9]+) ' , 1 )) AS job_id, TRY(regexp_extract(message, ''' Username '' : u '' ([^ '' ]+) ''' , 1 )) AS username, TRY(regexp_extract(message, ''' Query ID '' : u '' ([^ '' ]+) ''' , 1 )) AS query_id FROM unnested_logs ) SELECT " day " , event_id, timestamp , log_timestamp, pid, log_level, job_id, username, query_id, message AS original_message FROM parsed_logs; なおテーブル以外にビューを作成しているのは Data Firehose 内でログの整形などをせず S3 に送出しているために本来欲しいログが message エントリ内に押し込められている点の対策によります。これは拙稿の以下と同様です。 tech.mntsq.co.jp おわりに IAM Identity Center を IdP として SSO アクセスするケースについて、Redash を題材に解説しました。Redash への SSO アクセスに必要な情報よりもログ関連のオペレーションに関する内容のほうが多いように思い、本稿題名をミスったのではないかと今にして思ってしまいました。 SSO アクセスすることでの ID 管理手法の整頓やサービス利便性向上もさることながら、利用対象とは別の箇所でログが得られることでの状況把握が出来るようになるというメリットも、外部 IdP を使うことでのメリットといえるでしょう。本稿で述べたように CloudTrail ログが使える場合であってもその探索には難儀するケースがありますが、手を尽くせば追跡は可能という事実に勇気付けられる面もあり、また実務上も有益な点はあると感じました。 Redash をお使いの方で IdP の選定に悩まれている方の一助になれば幸いです。 MNTSQ 株式会社 SRE 秋本 *1 : https://redash.io/help/open-source/setup/#-Docker *2 : https://www.keepersecurity.com/ja_JP/solutions/enterprise-password-management.html *3 : https://docs.aws.amazon.com/singlesignon/latest/userguide/manage-your-applications.html *4 : https://docs.aws.amazon.com/singlesignon/latest/userguide/customermanagedapps.html *5 : https://redash.io/help/user-guide/users/authentication-options/ *6 : https://redash.io/help/user-guide/users/authentication-options/#Self-Hosted-SAML *7 : https://redash.io/help/user-guide/users/authentication-options/#SAML-2-0 *8 : https://docs.aws.amazon.com/athena/latest/ug/create-cloudtrail-table-partition-projection.html
アバター
SREチームの藤原です。 サービスを運用しているとデータ分析や他システムとのデータ連携などで、DBへの読み取り専用アクセスが必要になる場合は多々あると思います。 本エントリでは、 AWS PrivateLink を活用することで、セキュアなクロスアカウントかつ、クロス VPC なアクセスを実現できるのでその方法を解説します。 aws.amazon.com 構成のイメージ 本エントリでは、2つのアカウントの間でのクロスアカウント & クロス VPC アクセスを前提とします。 アカウントAがアクセスする側、アカウントBがアクセスされる側とします。 PrivateLink設定前のアカウントBは図1のような構成です。 図1. アカウントB(アクセスされる側)の構成 この環境に対して、他の AWS アカウント(この図ではアカウントA)の VPC からアクセスしたいケースを想定しています。 具体的には図2のような構成を想定しています。 図2. アカウントAからアカウントBのRDSへのアクセスイメージ ここではアカウントAからアカウントBへの具体的なアクセス方法については示していません。 PrivateLinkを使った接続をする前に VPC ピアリングや、Transit Gateway などのサービスを使った VPC 間接続と今回の要件における課題について述べたのち、PrivateLinkを使った接続方法について解説します。 VPC ピアリングやTransit Gateway を使った VPC 間接続とその課題 クロスアカウント、クロス VPC でのリソースアクセスを考えると、昔から AWS を使っている人であれば、 VPC ピアリング( VPC ピア機能)と AWS Transit Gateway の活用が浮かぶと思います。 docs.aws.amazon.com aws.amazon.com 今回これでも要件は達成できるのですが、注意点が2つほど存在します。 ひとつ目が VPC ネットワークのCIDRが重複している場合は VPC ピアリングもTransit Gateway も使えません。 もうひとつがセキュリティ的な課題です。 VPC ピアリングやTransit Gateway での接続はIP層でネットワークを直接繋げるような形になります。 このような場合、意図しないリソースへのアクセスを防ぐためには、セキュリティグループなどの設定を細かく設定、管理しなければいけません。 今回の事例では、CIDRの重複は発生していませんが、意図しないリソースへのアクセスの可能性を排除する目的で、PrivateLinkを活用することとしました。 PrivateLinkを使ったセキュアな VPC 間接続 PrivateLinkを活用した場合の VPC 間の接続は次の図3のようになります。 VPC エンドポイントを介してアカウントA、Bそれぞれの VPC 間を接続し、 Aurora MySQL のリーダーエンドポイントのみへの到達性を確保 します。 図3. PrivateLinkを使った接続 以降では、まずアカウントB側の準備を行い、その後でアカウントA側の準備を進めていきます。 なお、 Terraformでの定義例を示していますが、アカウントAとBのリソースはそれぞれ異なるstateファイルで管理していることを前提としています 。 アカウントB(アクセスされる側)でのリソースの準備 まず、アカウントB側でResource Gateway とResource Configurationsを作成します(図4)。 図4. Resource ConfigurationとResource Gateway AWS VPC Latticeにおいて、Resource Gateway に対応するTerraformのリソースは aws_vpclattice_resource_gateway です。 このリソースはTerraformを利用する場合、次のように定義します。 resource "aws_vpclattice_resource_gateway" "gateway" { name = Resource Gatewaysの名前 vpc_id = Resource Gatewayを設置するVPCのID subnet_ids = [ Resource Gatewayを設置するサブネットIDのリスト ] security_group_ids = [ Resource Gatewayに設定するセキュリティグループIDのリスト ] } Resource Configurationに対応するTerraformのリソースは aws_vpclattice_resource_configuration です。 resource "aws_vpclattice_resource_configuration" "configuration" { name = Resource Configurationの名前 resource_gateway_identifier = aws_vpclattice_resource_gateway.gateway.id port_ranges = [ "3306" ] protocol = "TCP" resource_configuration_definition { dns_resource { domain_name = Aurora MySQLのリーダーエンドポイントのFQDN ip_address_type = "IPV4" } } } resource_gateway_identifier パラメータで接続先となるResource Gateway のIDを指定します。 resource_configuration_definition の dns_resource ブロックで接続先となるリソースの ドメイン を指定します。 Aurora MySQL のリーダーエンドポイントの FQDN をここで指定することにより、 PrivateLinkの接続先をリーダーエンドポイントのみに限定できます 。 次に、アカウントAから VPC エンドポイントを通じて接続できるようにする必要があります。 このために AWS Resource Access Manager(以降、 AWS RAM)を利用して、 Resource ConfigurationをアカウントAに共有します 。 aws.amazon.com RAMで共有するリソースをTerraformで定義する場合は、 aws_ram_resource_share 、 aws_ram_principal_association 、 aws_ram_resource_association の3つのリソースを利用します。 resource "aws_ram_resource_share" "resource_configuration_share" { name = リソース共有の名前 } resource "aws_ram_principal_association" "resource_configuration_share" { resource_share_arn = aws_ram_resource_share.resource_configuration_share.arn principal = リソースの共有先となるAWSアカウントID(アカウントAのアカウントID) } resource "aws_ram_resource_association" "resource_configuration_share" { resource_arn = aws_vpclattice_resource_configuration.configuration.arn resource_share_arn = aws_ram_resource_share.resource_configuration_share.arn } aws_ram_resource_share はリソース共有そのものを定義します。 aws_ram_principal_association では、リソースを共有する先のアカウントなどを指定します。 今回はアカウントAとリソースを共有したいので、 aws_ram_principal_association にはアカウントAの AWS アカウントIDを指定します。 最後に aws_ram_resource_association では、具体的に aws_ram_resource_share で共有したい AWS リソースを指定します。 上記の例では、Resource Configurationを共有対象としています。 これにより、アカウントB側のResource ConfigurationがアカウントA側で利用する準備ができました。 アカウントA(アクセスする側)でのリソースの準備 アカウントAでは、アカウントBから AWS RAMを通じて共有されたResource Configurationを受け入れる必要があります。 AWS 管理コンソールの AWS RAM画面からリソース共有を受け入れます 1 。 図3. PrivateLinkを使った接続(再掲) アカウントA側では、Resource Configurationに紐づいた VPC エンドポイントを定義する必要があります。 まず、 VPC エンドポイントに紐づける共有リソースの情報を取得するため、 aws_ram_resource_share のDataソースを使用します。 data "aws_ram_resource_share" "rds_resource_config" { name = リソース共有の名前  # アカウントBで共有されたリソースの名前 resource_owner = "OTHER-ACCOUNTS"   # 他アカウントから共有されたIPアドレス } name でリソース共有の名前を指定、 resource_owner にOTHER-ACCOUNTSを指定することで、アカウントBから共有された AWS リソース(今回の場合はResource Configuration)を取得できます。 次に、取得した aws _ram_resource_shareのリソースARNを VPC エンドポイントに関連付けます。 VPC エンドポイントに対応するリソースは aws_vpc_endpoint です。 resource "aws_vpc_endpoint" "rds_endpoint" { vpc_endpoint_type = "Resource" vpc_id = VPCエンドポイントを作成するVPCのID subnet_ids = [ VPCエンドポイントを作成するVPCのプライベートサブネットIDのリスト ] resource_configuration_arn = data.aws_ram_resource_share.rds_resource_config.resource_arns [ 0 ] } これにより、アカウントAのプライベートサブネットに VPC エンドポイントが作成され、サブネットに応じたプライベート IPアドレス が割り当てられます。 この状態でも機能しますし、アカウントBのAurora MySQL にアクセスできますが、 IPアドレス ベースよりもホスト名ベースでアクセスする方が管理しやすいため、 IPアドレス と FQDN を関連付けましょう。 Route53のプライベートホストゾーンを準備し、そのプライベートホストゾーン内でAレコードを定義します。 aws_vpc_endpoint.rds_endpoint の subnet_configuration から VPC エンドポイントの IPアドレス を取得できるので、Terraformの 局所変数 ( rds_endpoint_ip_addresses )を使って IPアドレス のリストを取得します。 locals { rds_endpoint_ip_addresses = sort ( [ for address in aws_vpc_endpoint.rds_endpoint.subnet_configuration : address.ipv4 ] ) } resource "aws_route53_record" "production_rds_endpoint_dns" { zone_id = プライベートホストゾーンのID name = VPCエンドポイントに紐づけたいFQDN type = "A" ttl = 60 records = local.rds_endpoint_ip_addresses } これにより、アカウントAのプライベートサブネットからアカウントBのAurora MySQL のリーダーエンドポイントへセキュアにアクセスできるようになりました。実際の接続コマンドは以下のような形になります。 mysql -u ユーザー名 -p -h VPCエンドポイントのFQDN これにより、アカウント横断かつ VPC 横断でAurora MySQL のリーダーエンドポイントへのセキュアなアクセスを実現できました。最終的な構成のイメージとしては図3のとおりになります。 まとめ クロスアカウント、クロス VPC でのリソースアクセスに際してPrivateLinkの活用する場合の例について解説しました。 VPC ピアリングや、Transit Gateway を使う方法もありますが、ネットワーク観点やセキュリティ観点での考慮事項(ネットワークのCIDRや、セキュリティグループなど)を減らせるため、今後はPrivateLinkを第一候補として考えても良いでしょう。 aws_ram_resource_share_accepter リソースを定義して受け入れることも可能ですが、一時的に必要なリソースであるため、 AWS 管理コンソールから手動で受け入れています) ↩
アバター
SREチームマネージャーの藤原です。 本エントリでは、現在構築中の新サービスにて利用する予定の CloudFrontおよび VPC Originの活用と、CloudFrontを経由した静的リソース配信について解説します。 シンプルな構成へのCloudFrontの導入 まずは非常にシンプルなアプリケーションを考えてみます(図1)。 単一のバックエンドのコンピューティングリソースに外部向けの API と内部向けの API がある場合、インターネットからアクセス可能な外部向けの ロードバランサー と、内部からのみアクセス可能な内部向けの ロードバランサー を分けて設定するパターンがあります。 図1. シンプルな構成の ウェブアプリケーション このような構成の場合、静的リソース(画像やjs, css など)はバックエンドのコンピューティングリソースから配信することになります。 静的リソースはコンピューティングリソースを消費しない形で配信したい ところです。 そこで、 CDN とオブジェクトストレージを活用します。 AWS の場合、CloudFrontとS3を利用することになります。この場合、インターネットと外部向けアプリケーション ロードバランサー の間にCloudFrontの ディストリビューション を配置、静的リソース配信用のS3 バケット を作成することになります(図2)。 図2. 単純にCloudFront ディストリビューション と静的リソース配信用 バケット を導入 CloudFrontと静的リソース配信用 バケット の間については、OAC(Origin Access Control)を利用することでセキュアに接続できます 1 。 図2の構成では静的リソースを CDN を用いてコンピューティングリソースを使用することなく効率的に配布することができます。 さらに、外部向けアプリケーション ロードバランサー からの動的リソース配信と同じ ドメイン 名を使って静的リソースを配信できるため、CORS 2 などを意識する必要もありません。 ただし、この構成では、CloudFront ディストリビューション と外部向けアプリケーション ロードバランサー の間に課題が埋まっています。 外部向けアプリケーション ロードバランサー の抱える課題 外部向けアプリケーション ロードバランサー はパブリック IPアドレス を持っています。 また、 AWS マネージドな FQDN も持つため、CloudFrontを経由せずに直接アクセスが可能です(図3)。 図3. インターネットから外部向けアプリケーション ロードバランサー への直アクセス(破線部) 一定以上の利用があるサービスを運用したことのある方にはわかると思いますが、 ロードバランサー の IPアドレス や AWS マネージドな FQDN を直接指定したHTTPリク エス トは、想像以上に多く発生します 3 。 ロードバランサー への直接アクセスをブロックしつつ、CloudFrontを経由したアクセスを通したい場合、次のような対策が基本として考えられます。 CloudFrontでHTTPヘッダを追加し、ロードバランサのリスナールールで特定のヘッダがある場合のみリク エス トを フォワ ードする 4 CloudFrontの AWS マネージド プレフィックス リストを ロードバランサー のセキュリティグループに適用する 5 それでも、外部向けアプリケーション ロードバランサー にはパブリックな IPアドレス (や AWS マネージドな FQDN )が割り当てられたままです。そこで「パブリック IPアドレス も FQDN も持たない ロードバランサー をCloudFrontのオリジンとして使えないだろうか」と考えたくなります。この考えを実現してくれるのがCloudFrontの VPC Originです。 CloudFrontの VPC Origin CloudFrontの VPC Origin機能を利用すると、プライベートサブネット内にENI(Elastic Network Interface)が作成されます。このENIを経由して、CloudFront ディストリビューション とアプリケーション ロードバランサー 間の通信が行われます(図4)。 図4. VPC Originの利用によるENIの設置 VPC Originを利用することで、CloudFront ディストリビューション のオリジンを、パブリックサブネットに配置された外部向け ロードバランサー ではなく、プライベートサブネットに配置された ロードバランサー (以前の図における内部向け ロードバランサー )とすることができます。 プライベートサブネットに配置されたアプリケーション ロードバランサー は内部向けのため、パブリック IPアドレス を持ちません。 これにより、CloudFront ディストリビューション を経由しないインターネットからの直接アクセスを完全に排除できます 。 VPC Originを導入した後の構成 VPC Originを導入した後の構成としては図5のようになりました。 図5. VPC Origin導入後の構成 図1と図5を比較すると、以下の機能的な違いがあります。 静的リソースを配信用 バケット で提供することで、コンピューティングリソースを消費せずに配布できるようになった 図2と図5を比較すると、主な差分は次の2点です。 アプリケーション ロードバランサー へのインターネットからの直接アクセス経路が完全に排除された 外部 ロードバランサー と内部 ロードバランサー が一本化され、内部 ロードバランサー 相当のもののみとなった 単純な構成の差分としてはここまでです。運用時も見据えると以下の観点でメリデメが生じます。 メリット アプリケーション ロードバランサー へのインターネットからの直アクセスを考える必要がない(セキュリティ的なメリット) インターネット - ロードバランサー 間の経路そのものがなくなった 設定管理対象の削減(構成管理上のメリット) アプリケーション ロードバランサー が2台から1台になった デメリット アプリケーション ロードバランサー の設定が複雑化する可能性(構成管理上顕在化する可能性のあるもの) 外部向けと内部向けのリスナールール両方を一つのリスナー設定に含める必要がある アプリケーション ロードバランサー のリスナールールは外部向けと内部向けの両方を単一の場所にまとめて記述する必要があるため、複雑なルールを設定する場合は注意が必要です。 一方で、設定が一箇所にまとまることで「あれ、これはどのロードバランサに含まれていたっけ?」といった混乱が起きにくくなるため、リスナー設定がシンプルな場合はデメリットは最小化できます。 今回設計したサービスではリスナールールに複雑な要素がほとんどなかったため、外部向けと内部向けの ロードバランサー を一つに統合した際のデメリットの顕在化はほぼなく、直接アクセス経路の排除というメリットのみを実質的に得ることができました。 まとめ 本エントリでは、CloudFrontの VPC Originの利用による構成への影響、CloudFrontを使った静的リソースの配信について触れました。 特にCloudFrontの VPC Originを利用することでパブリックサブネットの利用を最小化することはセキュリティ観点でポジティブに捉えて良いでしょう。 今後はCloudFront + VPC Origin + ロードバランサー の組み合わせが AWS の構成の デファクトスタンダード となる可能性もあるかもしれないとも感じました。 また、静的リソース配信についてはCloudFront + S3の組み合わせが大量アクセスを捌くといった観点でも非常に重要なので設定しない手はないと言えるでしょう。 Amazon CloudFront オリジンアクセスコントロール(OAC)のご紹介 ↩ オリジン間リソース共有 (CORS) - HTTP | MDN ↩ これは一種の攻撃に類するものではあるので、何かしらの対策はした方が良いでしょう。前職にて対策において参考となりそうな情報を提示しています。 WAFを活用する上で入れておきたいファイアウォールのルール定義 - STORES Product Blog ↩ Application Load Balancer へのアクセスを制限する - Amazon CloudFront ↩ Amazon CloudFront用のAWS マネージドプレフィックスリストを使用したオリジンへのアクセス制限 | Amazon Web Services ブログ ↩
アバター
はじめに こんにちは、MNTSQ株式会社でSREをやっている西室と申します。私生活ではゲーム以外でPCを使わないので、最新技術へのアンテナ感度はエンジニアとしては最低クラスです。未だに タッチタイピング ができません。 さて、最近巷では「生成AIがすごい」だの「使えないと時代に取り残される」だの、何かと話題が尽きないですが、まだ業務にうまく活用できていないという方も多いのではないでしょうか? かくいう私も「なんか調べるのが億劫だな〜」と、ChatGPT以外には手を出していなかったのですが、半年ほど前に開発チームに Devin が導入されたので試しに使ってみたところ、これがなんともう 世界が一変するくらい便利 でした......。この感動を共有するために、本記事にて活用事例を紹介してみようと思います。 繰り返しますが、私はAIに関する専門知識もありませんし、技術が好きというわけでもありません。 生成AIを便利に活用するだけなら高度な知識は必要ありません 。 はじめに その前にDevinとは 活用事例紹介 仕組みを作らせる系 GuardDutyのアラートをGitHubにIssueとして起票するLambdaを実装してもらった ECSタスク定義の差分をPRにコメントするGitHub Actionsの実装してもらった コード整頓系 デプロイWFの実装シンメトリを揃えてもらった ハードコードされていたオートスケールのパラメータを変数化してもらった 壁打ち系 使い慣れていないリポジトリに大きな変更を行う際の調査・設計を手伝ってもらった まとめ Devinを活用するには おわりに その前にDevinとは いったいなんなんでしょう。私にはよくわかりません。半年前に会社の人が導入してくれました。詳しいことはChatGPTとかに聞いてください。他にもClaude Codeとかも流行ってるという噂を聞きました。 私にわかることは以下のみです。 なんかお願いすると リポジトリ のコードとかをみていい感じにPRを作ってくれる PRコメントやプロンプトを使って対話形式で追加修正もしてくれる 「ナレッジ」と呼ばれる約束事みたいなのを人間の言葉で設定できて、基本的にそれを守って作業してくれる devin.ai 活用事例紹介 記事の趣旨から外れるため、生成した ソースコード などは載せていません。「こんな風に使えるんだ〜」というのを感じ取っていただければ幸いです。 仕組みを作らせる系 GuardDutyのアラートを GitHub にIssueとして起票するLambdaを実装してもらった 弊社SREチームではセキュリティ関連のさらなる充実をミッションにしており、 GuardDuty を使用した脅威検出などの整備を進めています。今までの対応フローは 脅威検出 -> Slack通知 -> Issueの手動起票 -> 対応 -> Issueクローズ という流れでしたが、Issueを手動で起票するのがいい加減大変になってきたため、この部分をDevinに自動化してもらいました。 ※ 書いたプロンプト <terraformファイルへのリンク>で定義されている aws _ sns _topic.chatbot_guarddutyは、GuardDutyで脅威が検出された際、EventBridge -> SNS -> Chatbot -> Slackと通知を行うための SNS である。この SNS からLambdaにもファンアウトして、以下のような処理を行いたい 1. 脅威のタイトルと同名のIssueを GitHub のmntsq-infra リポジトリ に作成する 2. 本文には適当な概要と、検出結果へのリンクを貼る 3. guardduty/<レベル>のタグをつける<レベル>は脅威の重要度(低~クリティカル) 4. MNTSQ/sreのチームを アサイ ンする かなり大雑把な指示ですが、Devinは リポジトリ の ソースコード を読んで、どうのように実装すれば良いか、周りのコードとシンメトリーを合わせながら実装を行ってくれました。いきなり完璧なものが出来上がるわけではないので、PRにコメントする形で何度かやり取りをしました。同僚とチャットするような感覚で、ラフに不足箇所を指摘します。 ※ やり取りの雰囲気(実際にはもうちょいコンテキストの深いやり取りもしています) Devin 「できたよ。 GitHub tokenが必要だからこうやって設定してね」 わたし「了解。Lambdaのソースファイルはそっちじゃなくてこっちに置いて」 Devin 「OK」 わたし「token平文でLambdaの 環境変数 に突っ込んでるのまずいから、処理の中でSSMから取得するようにして」 Devin 「ごめん、直したよ」 わたし「Lambdaに〜の権限つけ忘れてるよ」 Devin 「ごめん、直したよ」 わたし「動いたわ、サンキュー」 結果、GuardDutyで脅威検出した際には以下のようなIssueが起票されるようになりました!下手に人間が作業するよりも、綺麗なフォーマットで起票してくれる実装になっていて嬉しいですね。細かいトラブルシュートまで含め、PR完成までの作業時間としては4時間くらいでしたが、 Devinがコードを書いている間の待ち時間をチャットの返信や細かいタスクに当てられるので、非常に効率的 に作業ができました。 起票されるようになったIssue ECSタスク定義の差分をPRにコメントする GitHub Actionsの実装してもらった 弊社ではタスク定義のテンプレートをアプリケーションのコードと同じ リポジトリ で管理しています。アプリケーションに変更が入ると、CDによって自動で AWS 環境のECSサービスの更新までが行われます。先日、タスク定義テンプレートの修正に不備があり、今まで値が入っていた 環境変数 が空になってしまったことによる障害が発生しました。障害を受けての恒久対策として、PR作成時に、CDによってタスク定義が変わる場合はその差分をPRにコメントするGitHubActionを作成してもらいます。 以下のようにプロンプトを書いてみました。 < リポジトリ 名>に以下のようなGithubActionのワークフローを作成したい。 * <ブランチ名>へのPR作成をトリガに動く * マージ後、デプロイのWF(deploy_clm, deploy_connectmail, deploy_ibreoffice, deploy_search_ api )が動いた際に新規作成されるタスク定義の差分表示とバリデーションを行う * deploy_clm, deploy_connectmail, deploy_ibreoffice, deploy_search_ api にて作成されるタスク定義の JSON と、ECSの最新のタスク定義 JSON のenvironment以下の要素を比較し、差分をどのタスク定義の差分なのかがわかる形でPRにコメントする * 新たに作成されるタスク定義のenvironmentに Value が空のものがあれば失敗させる deploy_clmとかいっているのは、サブサービス毎のデプロイのWFのことです。弊社ではサブサービス毎にデプロイの単位を分割しているので、WFファイルも複数に分かれていますが、それぞれのWFの実装方法に結構な差分がありました。 コードベースがごちゃついていることに加え、この丸投げするような指示の仕方がDevinを混乱させてしまい 、見当違いのPRばかりが作成されてしまいました。 そこで、以下のように作業を分割しました。 似たような処理を共 通化 して実装差分をなくす デプロイWF内のタスク定義 JSON を生成するステップ群を composite 化して、各デプロイのWFからはこのcompositeを使用してタスク定義 JSON を生成するようにしてもらう タスク定義の差分比較ロジックを共 通化 して再利用可能にする inputに与えた、タスク定義 JSON およびタスクファミリーのlatestの定義を比較して、 環境変数 の差分を指定したPRにコメントするcompositeを作成する バリデーションWFの外枠を整える PR作成をトリガとし、コード差分があったサブサービスについてタスク定義を作成し、2で作成したcompositeを使用してタスク定義の比較を行うWFを作成する 1の作業でコードのシンメトリーが揃い、Devinがコード構造を理解しやすくなります。加えて2,3の作業のように変更が大きくなりすぎないように指示を出すことによって、Devinもやるべきことを間違えずらいし、レビューもしやすくなりました。Devinに指示を出すときは、 新卒の社員にタスクを渡す感覚で分割する こと、そもそも 理解しやすいようにコードベースを整理しておく ことが大切みたいです。 PR作成時に、タスク定義の差分が表示されるようになりました! タスク定義に差分できる場合のコメント コード整頓系 デプロイWFの実装シンメトリを揃えてもらった コードが整頓されていることが、Devinの活用のしやすさにつながることは前述のとおりです。であれば、コードの整頓もDevinにやってもらいましょう。 リポジトリ に似たような処理が複数あるが、実装方法が微妙に異なる場合を考えてみます。まずは1つ自前で(Devinにやらせてもいいですが)お手本の修正PRを作ります。そしてDevinに以下のように指示します。 ※先ほども例に挙げた、サブサービス毎のデプロイWFのシンメトリーを揃えてもらうための指示 <マージ済みのお手本PRのリンク>と同じように、. github /workflows/deploy_connectmail.ymも修正してください。以下の点に気をつけてください。 * 変数設定のロジックをset-variablesというjobに集約すること * 変数ファイルの読み込みには. github /actions/load-env-variables/action.ymlを使用すること 注意して欲しい箇所だけプロンプトに書いておけば、あとは お手本を参照して綺麗に一発で作業してくれました 。これをサブサービス毎に行い、デプロイWFの実装のシンメトリーを完全に揃えることができました。 お手本があり、それを模倣して実装するという作業ではかなりの威力を発揮してくれるようです 。 ハードコードされていたオートスケールのパラメータを変数化してもらった terraformのvariableにすべき箇所をハードコードしてしまっているので、直してほしいというケースです。特に今回治したかった箇所は、カスタムメトリクス化している自前のキューのメッセージ滞留数を 閾値 としてオートスケールさせている箇所で、設定が複数のリソースにわたっています。 人力で影響範囲を網羅するのが少し大変 です。そんな作業も、 リポジトリ 構造を把握しながら作業してくれるDevinにお願いすれば一発です。 <関連 ディレクト リ>以下にハードコードされているオートスケールの設定を、<変数ファイル>に変数として抜き出して、気軽に書き換えられるようにしたい。設定したい項目はここら辺。 * オートスケールの上限 * スケールアウトを行うキュー滞留の 閾値 * 1回のスケールアウトでタスクを何台追加するか * 1回スケールアウトしてから再度スケールアウトするまでのクールダウン時間 とりあえず実装方針を確認したいから、<サブサービス>についてのみ上記の対応を行なって 変更が大きくなる場合は局所に絞って変更するように指示を出すと、レビューがしやすい です。方針をレビューしたら、他の部分も同じように変更するように指示を出しましょう。この変更も、特に直しの必要はなく一発でやってくれました。こういう単純な作業は人間よりも明らかにAIの方が得意ですね。 壁打ち系 使い慣れていない リポジトリ に大きな変更を行う際の調査・設計を手伝ってもらった 弊社ではインフラとアプリケーションの リポジトリ は分かれており、私はアプリケーションの リポジトリ には明るくありません。ところが少々アプリケーション リポジトリ を読み込まないければならない事情があり、Devinに相談に乗ってもらいました。 ※ ElasticSearchのインデクシングをBlue/Greenで行いたいんだけど......という相談 < リポジトリ 名>について。<ファイル名>の"initialize"のタスクについて、その処理時間がプロダクト運用上大きな問題になっている。弊社では頻繁にインデックス構造の見直しがあり、その度にこのタスクを実行するが、その間インデックスが使用できなくなるため、サービスを停止してインデクシングを行なっている。 サービス停止を行わずにインデックスを更新できるように、blue/greenでinitializeのタスクを実行できるようにしたい。以下は現時点で考えているア イデア の箇条書き * ノードではなくインデックスレベルでblue/greenの面を作る * initializeタスク実行時、現在使用していない面について更新を実行し、使用している面はその間も停止時間なく使用可能とする * タスク実行中にインデックスの追加・更新などのメッセージがキューに詰まれる時、それはblueとgreen両方の面の分積む(blue/green差分が生じないように) * 更新が終わったら面の切り替えを行う * 平時は使用していない面は落としておく とてもざっくりしていて申し訳ないけど、 * もっと良いア イデア はないか * このア イデア に追加したい項目はないか * クラスレベルの抽象度で、実装するとしたらどのような変更を加えるか など、考えを聞かせてほしい。 膨大な ソースコード を読み込まなければ検討すらできないような大きな課題ですが、なんとものの数分で複数のわかりやすい図とともに実装計画をドラフトしてくれました! もちろん考慮漏れは多々あるので、少しでも疑問を感じたら実際のコードを読みにいって、方針の妥当性を確認し、必要なら直してもらいます。以下のようなやり取りを延々と続けて、気になる点をひたすら潰していきました。 こんなやり取りを数十往復した 2日ほどかけ、最終的に、 とりあえず検証のために仮実装できるイメージを持つところまで到達できました 。クラス図や処理フロー毎のシーケンス図も、プロンプトでのやり取りを反映しながらブラッシュアップしてくれました。作成してもらった図は、現在の構造を表すものまで含めると合計で10個になりますが、これを手で作ったとするとかなりの 工数 がかかったはずです。もちろん実際に実装を始めてみると問題も出てくるかもしれませんが、 たった2日程度でほとんど手を加えたことのない リポジトリ の(必要な箇所の)構造把握と、大規模な変更プランをイメージできるようになれるのであれば、破格の作業効率ではないでしょうか。 仮完成したクラス図 仮作成したシーケンス図 まとめ Devinを活用するには ダラダラと活用事例を書いてきましたが、結局のところ以下のようにまとめられると思います。 Devinを活用するための前提 適切なタスク分解・わかりやすい指示ができる コードベースがある程度整っている レビュー・動作確認を怠けないマインドを持っている(とても重要) Devinは決して万能の存在ではなく、素晴らしく作業が早い新入社員と思って接するのがちょうど良い なと思いました。うまく指示を出してあげないと見当違いのことをしますし、ぐちゃぐちゃのコードを理解するのには人並みに手こずるようです。また、ちゃんと指示を出したつもりでも細かいミスはします。 Devinを使うべきでないシチュエーション タスクを 言語化 ができない場合 実装後の差分が大きくなり過ぎてしまう場合 これはシンプルに 「自分が理解していないタスクは任せるな」 ということです。 言語化 できないのは論外ですし、実装後の差分が大きくなりすぎるということは、そのタスクをきちんとサブタスクに分けられていないということです。(≒作業見通しが甘い)逆に自分が理解していて、適切な粒度に分けられるタスクは、積極的に任せて問題ないなと思います。しかし、 理解できていないタスクを理解するための壁打ち相手としては優秀 です。 おわりに ここ最近クローズしたIssueを振り返ると、8割近くをDevinに実装をしてもらっていました。もう私いらないんじゃないかな? というのは冗談で、生成AIはあくまでツールです。使う側が リテラシー を持って扱う必要があります。Devinを業務に導入するようになってから、決して大袈裟ではなく 業務効率は倍以上になった と感じます。今後は、自分でコードを書けるだけでなく、生成AIに適切な指示を出し、その結果を正しく評価できることが、エンジニアの市場価値につながっていくのだと思います。 先ほど「Devinは新入社員と思って接するのがちょうど良いと」いう例えをしました。いかに彼らに的確な指示を出し、円滑に協力できるか。生成AIを扱う上で求められる能力は、いわゆる「コミュニケーション能力」に近いのかもしれません。(詳しい人はもっとプロンプトに関わるハックなど持っているのかもしれません。あくまで私くらいの活用レベルでの話です)この感覚が正しいのであれば、"そこそこ" 程度の技術力は、もはや大した強みにはならなくなってしまうのでしょう。磨こう、 コミュ力 。 MNTSQ株式会社 SRE 西室
アバター
はじめに MNTSQ はそのサービスの性質(「契約」の集約、一元管理、活用)上、セキュリティの維持と向上が至上命題です。 セキュリティへの取り組みには幾つかのアプローチがありますが、何が不足しているのか、どういった対処が必要かという点を突き止めるには情報が必要です。これはどういったアプローチを取るにしても共通して重要な観点と思います。 本稿はこの情報の獲得のためのログ収集範囲の拡充を行った記録となります。対象は Route 53 の DNS クエリログです。 なぜ DNS クエリログを取るか DNS クエリログはその名前の通り DNS へのクエリのログです。つまり いつ 誰が 何を どこから(「誰が」と同一の情報になる場合あり) が DNS クエリ単位で得られます。Route 53 で得られる DNS クエリログには以下2種類があります。 公開 DNS クエリログ: Public DNS query logging - Amazon Route 53 インターネットからの Route 53 公開 (public) hosted zone に対して発行された DNS クエリに関するログ リ ゾル バクエリログ: Resolver query logging - Amazon Route 53 VPC 内からインターネットに向けて発行された DNS クエリに関するログ VPC に紐付く Route 53 非公開 (private) hosted zone の名前解決は Route 53 リゾルバ が担い、ログ出力もこれが担う つまり Route 53 においては上述 DNS クエリログを インターネット → Route 53 公開 hosted zone(公開 DNS クエリログ) VPC → インターネット(リ ゾル バクエリログ) の2方向に関して収集することができます。これによって得られる情報はいくつか考えられますが、 インターネット → Route 53 公開 hosted zone 所謂 attack surface を狙われている形跡の確認 意図しないホストに対してのリク エス トが継続していないか等、接続エラーの確認 VPC → インターネット VPC 内から意図しない通信が発生していないかの確認 といったものがパッと思い付くだけでも挙げられます。実際にログを取ってみて初めて気付ける活用法もあるはずなので、まずはログを取ることを目的としてもよいでしょう。 DNS クエリログ収集構成 構成図を以下に示します。 AWSACC1 = Route 53 リソース稼動アカウント(図では1つだが実際には複数存在) Analysis = ログ分析用アカウント(1つのみ存在) Route 53 が生成する各 DNS クエリログを最終的には専用の AWS アカウント内に用意した S3 バケット に集約し、当該アカウントの Athena からログを解析する構成となります。 ログを必ずしも専用の AWS アカウントに集約する必要は無いのですが、今回は Athena でのログ検索時の利便性の面から、ログ集約先および活用場所をひとつの場所に絞るようにしました。S3 上にログを集約する取り組みが DNS クエリログについては初であった点も保存先選定の柔軟さに一役買っています。 図から判るとおり、 DNS クエリログによって S3 への保存方法が異なります。 リ ゾル バクエリログはログ出力先を複数選べ、選択肢の中には S3 がデフォルトで存在します *1 。 一方で公開 DNS クエリログについては CloudWatch Logs 以外にログを出力する選択肢はありません *2 。また CloudWatch Logs ロググループは us-east-1 にあるものだけが利用可 という制約もあります。 弊社でログ検索用に整備している Athena とその関連リソースは ap-northeast-1 にあることを前提にしているので、ここは出来れば ap-northeast-1 に寄せたいところです。このあたりを踏まえて公開 DNS クエリログについては Data Firehose を使い us-east-1 内で CloudWatch Logs から S3 へログを移設 S3 レプリケーション で us-east-1 から ap-northeast-1 へリージョンを跨いで最終目的地となる S3 バケット へログを保存 という構成をとるようにしました。 Terraform コード Route 53 ログを生成する側を submitter、ログを最終的に保管し Athena で検索する側を receiver とし、2つのコードを例示します。 前述の構成図でいえば AWSACC1 に相当するものが submitter、 Analysis に相当するものが receiver になります。 いずれも実際に使っているコードを改変しての例示となります。 submitter 以下を実施するコードです。Route 53 各ゾーン (private / public) および VPC は既に存在するものとします。 リ ゾル バクエリログを receiver 側 S3 バケット として保存 公開 DNS クエリログを us-east-1 の CloudWatch Logs ロググループに保存 us-east-1 にある CloudWatch Logs ロググループの内容を us-east-1 の S3 バケット へ Data Firehose を使い送出 後述の Athena でのログ解析の都合で dynamic partitioning ( https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html ) を有効にしています us-east-1 にある S3 バケット の内容を receiver 側の ap-northeast-1 下 S3 バケット へレプリケート main.tf data "aws_caller_identity" "current" {} # リゾルバクエリログ収集用コード resource "aws_route53_resolver_query_log_config" "main" { name = var.route53 [ "resolver_query_log" ][ "config_name" ] destination_arn = var.route53 [ "resolver_query_log" ][ "bucket_arn" ] } resource "aws_route53_resolver_query_log_config_association" "main" { resolver_query_log_config_id = aws_route53_resolver_query_log_config.main.id resource_id = var.vpc [ "id" ] } # 公開 DNS クエリログ収集用コード resource "aws_cloudwatch_log_group" "aws_route53_public" { provider = aws.us-east-1 name = var.route53 [ "public_dns_query_log" ][ "log_group_name" ] retention_in_days = 14 # S3 上のログを実運用上は使うので CloudWatch Logs には長期保管する必要がない } data "aws_iam_policy_document" "route53_query_logging" { statement { actions = [ "logs:CreateLogStream" , "logs:PutLogEvents" , ] resources = [ aws_cloudwatch_log_group.aws_route53_public.arn, ] principals { identifiers = [ "route53.amazonaws.com" ] type = "Service" } } } resource "aws_cloudwatch_log_resource_policy" "route53_public_query_logging_policy" { provider = aws.us-east-1 policy_document = data.aws_iam_policy_document.route53_query_logging.json policy_name = "$ { var.route53 [ "resolver_query_log" ][ "keyword" ]} -policy" } resource "aws_route53_query_log" "public" { provider = aws.us-east-1 depends_on = [ aws_cloudwatch_log_resource_policy.route53_public_query_logging_policy ] cloudwatch_log_group_arn = aws_cloudwatch_log_group.aws_route53_public.arn zone_id = aws_route53_zone.public.zone_id } # us-east-1 内で CloudWatch Logs から S3 へログを Data Firehose 経由で移す resource "aws_cloudwatch_log_subscription_filter" "s3_stream_filter" { provider = aws.us-east-1 name = "$ { var.route53 [ "public_dns_query_log" ][ "keyword" ]} -to-firehose" log_group_name = aws_cloudwatch_log_group.aws_route53_public.name # 全ログを転送対象にしたいので filter_pattern は空にする filter_pattern = "" destination_arn = aws_kinesis_firehose_delivery_stream.aws_route53_public.arn role_arn = aws_iam_role.route53_public_query_logs_to_firehose_role.arn } resource "aws_cloudwatch_log_group" "route53_public_firehose_log" { provider = aws.us-east-1 name = "/aws/kinesisfirehose/$ { aws_kinesis_firehose_delivery_stream.main.name } " retention_in_days = 14 # 最終保存先 S3 バケットにレプリケートされたログを実運用上では使うので長期間の保持は不要 } resource "aws_kinesis_firehose_delivery_stream" "main" { provider = aws.us-east-1 name = var.route53 [ "public_dns_query_log" ][ "keyword" ] destination = "extended_s3" extended_s3_configuration { role_arn = aws_iam_role.route53_public_query_logging_role.arn bucket_arn = aws_s3_bucket.aws_route53_public.arn buffering_size = 64 # MB 単位。dynamic partitioning が有効の場合必須 /* Data Firehose 内で Route 53 公開 DNS ログを Athena が解釈できる形式に変換する。詳細は後述 実施している内容は以下のとおり - CloudWatch Logs から Data Firehose に流れてくるログは gzip 圧縮されているのでこれを展開 - 展開した内容は1行に複数の JSON オブジェクトが含まれる形式になっているので jq を使い1行1オブジェクトになるよう展開 - S3 レプリケーションの事情で Data Firehose から S3 へログデータを置く場合は適当な prefix が欲しいので、これを "logs/" とできるよう設定 */ dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{prefix: {dummy: (\"logs\")} | .dummy}" # 実質的に "logs" という固定文字列を返すだけ } } processors { type = "Decompression" parameters { parameter_name = "CompressionFormat" parameter_value = "GZIP" } } processors { type = "AppendDelimiterToRecord" } } prefix = "!{partitionKeyFromQuery:prefix}/$ { data.aws_caller_identity.current.account_id } /!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/" error_output_prefix = "error/$ { data.aws_caller_identity.current.account_id } /!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/!{firehose:error-output-type}/" compression_format = "GZIP" cloudwatch_logging_options { enabled = true log_group_name = aws_cloudwatch_log_group.route53_public_firehose_log.name log_stream_name = "S3Delivery" } } } resource "aws_s3_bucket" "aws_route53_public" { provider = aws.us-east-1 bucket = var.route53 [ "public_dns_query_log" ][ "source_bucket_name" ] } resource "aws_s3_bucket_lifecycle_configuration" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id rule { status = "Enabled" id = "delete after 180 days" expiration { days = 180 # 集約先 S3 バケット側のログを使うので然程長期間保持しておく必要はない } filter { prefix = "" } } } resource "aws_s3_bucket_versioning" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_server_side_encryption_configuration" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } data "aws_iam_policy_document" "aws_route53_public_bucket_policy" { provider = aws.us-east-1 statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.aws_route53_public.arn } /*" ] principals { type = "Service" identifiers = [ "logging.s3.amazonaws.com" ] } condition { test = "ArnLike" variable = "aws:SourceArn" values = [ "arn:aws:s3:::mntsq-$ { var.env } -*" ] } condition { test = "StringEquals" variable = "aws:SourceAccount" values = [ data.aws_caller_identity.current.account_id ] } } } resource "aws_s3_bucket_policy" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.bucket policy = data.aws_iam_policy_document.aws_route53_public_bucket_policy.json } resource "aws_s3_bucket_replication_configuration" "route53_public_query_logging" { provider = aws.us-east-1 depends_on = [ aws_s3_bucket_versioning.aws_route53_public ] bucket = aws_s3_bucket.aws_route53_public.id role = aws_iam_role.route53_public_query_logging_replication.arn rule { id = "route53-public-dns-query-log-replication" status = "Enabled" filter { prefix = "logs" } delete_marker_replication { status = "Disabled" } destination { account = var.route53 [ "public_dns_query_log" ][ "destination_account_id" ] bucket = var.route53 [ "resolver_query_log" ][ "destination_bucket_arn" ] storage_class = "STANDARD_IA" access_control_translation { owner = "Destination" } } } } provider.tf provider "aws" { region = "ap-northeast-1" } provider "aws" { alias = "us-east-1" region = "us-east-1" } terraform { required_version = "~> 1.11.4" required_providers { aws = { source = "hashicorp/aws" version = "~> 6.0.0" } } } receiver submitter が生成した Route 53 ログを最終的に保管する S3 バケット を管理します。こちらはリージョンを跨がず ap-northeast-1 のみで完結するので、provider.tf の例示は省略します main.tf /* DNS クエリログを収集する対象となる AWS アカウントは AWS Organizations で管理している これらアカウントに対してのアクセス許可(S3 バケットポリシ)を個々設定するのは手間なので、organization 単位で許可できるようにする これには organization ID が要り、その値を得るための data */ data "aws_organizations_organization" "main" {} # リゾルバクエリログの最終保管場所となる S3 バケットとその周辺のリソースを定義 resource "aws_s3_bucket" "route53_resolver_query_logs" { bucket = var.s3 [ "resolver_query_logs" ][ "name" ] } resource "aws_s3_bucket_server_side_encryption_configuration" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } resource "aws_s3_bucket_lifecycle_configuration" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id rule { id = "transition to archives" transition { days = 30 storage_class = "STANDARD_IA" } transition { days = 60 storage_class = "GLACIER" } filter { prefix = "" } status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } data "aws_iam_policy_document" "route53_resolver_query_logs" { statement { effect = "Allow" actions = [ "s3:GetBucketAcl" ] resources = [ aws_s3_bucket.route53_resolver_query_logs.arn, ] principals { type = "Service" identifiers = [ "delivery.logs.amazonaws.com" ] } } statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.route53_resolver_query_logs.arn } /*" , ] principals { type = "Service" identifiers = [ "delivery.logs.amazonaws.com" ] } condition { test = "StringEquals" variable = "s3:x-amz-acl" values = [ "bucket-owner-full-control" , ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } } resource "aws_s3_bucket_policy" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id policy = data.aws_iam_policy_document.route53_resolver_query_logs.json } # 公開 DNS クエリログの最終保管場所となる S3 バケットとその周辺のリソースを定義 resource "aws_s3_bucket" "route53_public_dns_query_logging" { bucket = var.s3 [ "public_dns_query_logs" ][ "name" ] } resource "aws_s3_bucket_versioning" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_ownership_controls" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id rule { object_ownership = "BucketOwnerPreferred" } } resource "aws_s3_bucket_server_side_encryption_configuration" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } data "aws_iam_policy_document" "route53_public_dns_query_logging" { statement { effect = "Allow" actions = [ "s3:ReplicateObject" , "s3:ReplicateDelete" , "s3:ReplicateTags" , "s3:GetObjectVersionTagging" , "s3:ObjectOwnerOverrideToBucketOwner" , ] resources = [ "$ { aws_s3_bucket.route53_public_dns_query_logging.arn } /*" ] principals { type = "AWS" identifiers = [ "*" ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } statement { effect = "Allow" actions = [ "s3:GetBucketVersioning" , "s3:PutBucketVersioning" , "s3:ListBucket" , "s3:GetReplicationConfiguration" , ] resources = [ aws_s3_bucket.route53_public_dns_query_logging.arn ] principals { type = "AWS" identifiers = [ "*" ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.route53_public_dns_query_logging.arn } /*" , ] principals { type = "Service" identifiers = [ "logging.s3.amazonaws.com" ] } condition { test = "StringEquals" variable = "s3:x-amz-acl" values = [ "bucket-owner-full-control" ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } } resource "aws_s3_bucket_policy" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id policy = data.aws_iam_policy_document.route53_public_dns_query_logging.json } 公開 DNS クエリログの取り扱いについての注意 上記サンプルコード内で Data Firehose を使い CloudWatch Logs から S3 へ公開 DNS クエリログを送出する過程で、何やら小難しいことをしている箇所に目が付くと思います。 extended_s3_configuration { role_arn = aws_iam_role.route53_public_query_logging_role.arn bucket_arn = aws_s3_bucket.aws_route53_public.arn buffering_size = 64 # MB 単位。dynamic partitioning が有効の場合必須 /* Data Firehose 内で Route 53 公開 DNS ログを Athena が解釈できる形式に変換する。詳細は後述 実施している内容は以下のとおり - CloudWatch Logs から Data Firehose に流れてくるログは gzip 圧縮されているのでこれを展開 - 展開した内容は1行に複数の JSON オブジェクトが含まれる形式になっているので jq を使い1行1オブジェクトになるよう展開 - S3 レプリケーションの事情で Data Firehose から S3 へログデータを置く場合は適当な prefix が欲しいので、これを "logs/" とできるよう設定 */ dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{prefix: {dummy: (\"logs\")} | .dummy}" # 実質的に "logs" という固定文字列を返すだけ } } processors { type = "Decompression" parameters { parameter_name = "CompressionFormat" parameter_value = "GZIP" } } processors { type = "AppendDelimiterToRecord" } } これは Athena でログを処理することを前提とした前処理を Data Firehose のみで(= ログ処理用の Lambda 関数等を噛ませないで)実施する為の措置です。 通常 CloudWatch Logs にある Route 53 公開 DNS クエリログを Data Firehose でシンプルに S3 へ送出すると以下のような改行なしで複数の JSON オブジェクトが1行に集約されたものが得られます(実際のログを適当にマスクし例示します)。 { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/YVR52-R2 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522674731119363142938582278340608 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP YVR52-R2 192.0.2.143 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522685262133562904066894168588288 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO9-SN1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522690947843897953596035415605248 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com AAAA NOERROR UDP SFO9-SN1 192.0.2.144 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SEA900-R3 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522692678857209236868848315727872 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP SEA900-R3 2001:DB8::143 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522703737195603523266279501332480 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/ATL58-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231824257833115761174552619646976 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP ATL58-R1 192.0.2.148 192.0.2.0/24 " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/NRT8-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231840537028970579482687526338560 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP NRT8-SO1 192.0.2.10 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/KUL51-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047082905970302202038872078382990508713243113432088576 "," timestamp ": 17509 31754000,"message":"1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24"}]} ところがこの形式の JSON ログは Athena では受け付けられません。Athena は1行1エントリの JSON ログを要求するためです *3 。つまり上記例でいえば { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/YVR52-R2 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522674731119363142938582278340608 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP YVR52-R2 192.0.2.143 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522685262133562904066894168588288 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO9-SN1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522690947843897953596035415605248 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com AAAA NOERROR UDP SFO9-SN1 192.0.2.144 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SEA900-R3 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522692678857209236868848315727872 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP SEA900-R3 2001:DB8::143 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522703737195603523266279501332480 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/ATL58-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231824257833115761174552619646976 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP ATL58-R1 192.0.2.148 192.0.2.0/24 " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/NRT8-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231840537028970579482687526338560 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP NRT8-SO1 192.0.2.10 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/KUL51-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047082905970302202038872078382990508713243113432088576 "," timestamp ": 1750931754000 ," message ":" 1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24 " }]} のような JSON ログとなっている必要があります。よって Data Firehose から S3 へログを流す際にその中身を書き換える必要が出てきます。Data Firehose ではデータ処理の過程で Lambda 関数にその役目を担わせることができるので *4 それを使うのも手ですが、お世話が必要な主体をあまり増やしたくありません。 同じような事例が無いか調査していたところ medium.com が基本的な構想として多いに参考になり、また S3 による レプリケーション を考える場合の prefix 付与においては dev.classmethod.jp が大変参考になりました。つまりはコード中のコメントにもあるとおり CloudWatch Logs から Data Firehose に流れてくるログは gzip 圧縮されているのでこれを展開 展開した内容は1行に複数の JSON オブジェクトが含まれる形式になっているので jq を使い1行1オブジェクトになるよう展開 S3 レプリケーション の事情で Data Firehose から S3 へログデータを置く場合は適当な prefix が欲しいので、これを "logs/" とできるよう設定 を Data Firehose のみで実施することが出来、これは dynamic partitioning *5 によって達成が可能ということになります。 本来 dynamic partitioning はログに含まれるキー値から S3 へオブジェクトを保存する際の prefix を決定し Athena をはじめとする S3 をデータソースとする解析系サービス向けに パーティション を整備するための機能ですが、弊社のケースではそこまで凝ったことは不要で、 JSON ログをその構造を維持しつつログエントリ単位で適当に改行したいという共有が満たせれば OK です。先に示したコードも processing_configuration ブロックが割合シンプルなものになっています。 このコードは前述2記事に拠るところが多大に有ります。この場を借りて感謝申し挙げます。 ログを Athena で検索する さて前項までに Route 53 由来の DNS クエリログを S3 に集約して保存できるようになりました。これを Athena で検索してゆくようにする手筈を整えます。 リ ゾル バクエリログについては Use partition projection - Amazon Athena で示される内容が充分実用に耐えるものになりますが、公開 DNS クエリログについては AWS としての公式サポートが CloudWatch Logs であるということを踏まえても想像に難くなく、このようなクエリ例が存在しません。従って自前で頑張る必要があります。 早い話以下のような内容が使えます。前述の Data Firehose コードによって S3 へログが送出される前提の内容です。例示値や置換すべき値は前述のリ ゾル バクエリ用の例に倣っています。 CREATE EXTERNAL TABLE r53_public_dns_logs ( messageType string, owner string, logGroup string, logStream string, subscriptionFilters array< string >, logEvents array< struct< id: string, timestamp : bigint, message: string > > ) PARTITIONED BY ( `datehour` string ) ROW FORMAT SERDE ' org.openx.data.jsonserde.JsonSerDe ' STORED AS INPUTFORMAT ' org.apache.hadoop.mapred.TextInputFormat ' OUTPUTFORMAT ' org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat ' LOCATION ' s3://amzn-s3-demo-bucket/route53-public-dns-query-logging/logs/aws_account_id/ ' TBLPROPERTIES( ' projection.enabled ' = ' true ' , ' projection.datehour.type ' = ' date ' , ' projection.datehour.range ' = ' 1970/01/01/00,NOW ' , ' projection.datehour.format ' = ' yyyy/MM/dd/HH ' , ' projection.datehour.interval ' = ' 1 ' , ' projection.datehour.interval.unit ' = ' HOURS ' , ' storage.location.template ' = ' s3://amzn-s3-demo-bucket/route53-public-dns-query-logging/logs/aws_account_id/$${datehour}/ ' ) 確かにこれで公開 DNS クエリログを検索できるのですが、クエリの内容をみてもわかるとおり、ログの中身で最も知りたい筈の DNS クエリ周辺の状況 ( logEvents[].message ) が string として扱われるに留まっており、少々厄介です。例えば { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/KUL51-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047082905970302202038872078382990508713243113432088576 "," timestamp ": 1750931754000 ," message ":" 1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24 " }]} というログが有ったとして、この中で真に知りたいのは 1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24 であって、これを適当な列に分割したうえで列に対して具体的な値やパターン等を当て嵌めて検索するということが本来やりたいことです。とれる手段は Data Firehose でのデータ処理時に Lambda 関数を噛ませて JSON ログ中 logEvents[].message だけを S3 へ送出する対象とする ログは加工せず、Athena で頑張る というものが考えられそうですが、弊社のケースでは先述の通り「お世話が必要な主体をあまり増やしたくない」ので、Athena で頑張る方法を選びました。具体的には ログデータを直接扱い Athena 上で取り回しのしやすい構造にする為のテーブル 上述の Athena テーブル定義による ログデータから DNS クエリログに関する内容(= logEvents[].message )だけを検索対象とするビュー 後述 といったようにテーブル以外に Athena ビュー *6 を用意することで対処しています。具体的には以下のようなビュー定義を使用しています。 -- Route 53 公開 DNS クエリログを Athena で扱うためのビュー -- r53_public_dns_logs というテーブルを元ネタとして DNS クエリログを直接 Athena で検索できるようにするためのもの CREATE OR REPLACE VIEW r53_public_dns_log_view AS SELECT -- 正規表現を使い、message フィールドを仮想的な列に分割 -- 正規表現の各( )がキャプチャグループ(1から始まるインデックス)に対応 regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 1 ) AS version, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 2 ) AS timestamp , regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 3 ) AS hosted_zone_id, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 4 ) AS name, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 5 ) AS type , regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 6 ) AS response_code, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 7 ) AS protocol, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 8 ) AS edge_location, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 9 ) AS r_ip, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 10 ) AS c_ip, l.datehour FROM -- 対応する Athena テーブル名を指定する -- ログ内容は datehour によってパーティションが切られているのでビューでもこれを使えるようにする r53_public_dns_logs l CROSS JOIN UNNEST(l.logEvents) AS t(e) -- 元のログにおける logEvents 配列を展開 ここで作成したビューを対象として検索を実施することで、公開 DNS クエリログもリ ゾル バクエリログと同等の使い勝手で Athena にて取り回すことが可能になります。 おわりに Route 53 由来の DNS クエリログを Athena で取り扱う方法について解説しました。 S3 へのログ保存が公式にサポートされているリ ゾル バクエリログでは Athena による検索およびその運用に関する tips が数多く見付かる一方、公開 DNS クエリログについては CloudWatch Logs 以外の場所での保管を自前で頑張らないといけない事情で、ログ検索それ自体の tips は然程多くない現状があります。 両方の DNS クエリログを同等の手段(本記事では Athena + S3 ベースで)で横断して追跡できるようにすることで、ログ利用の手間感の低減や新たな洞察を得ることの切っ掛けになるはずです。実際に弊社ではこの手法で DNS クエリログを割合気軽に追えるようになったことで、これまであまりケアできていなかった DNS 関連の運用改善や外部からのリク エス ト調査に新たな観点を導入するといった効果が得られ、予想よりも多くの嬉しさがありました。 DNS クエリログ収集やその運用改善に本稿が一助となれば幸いです。 MNTSQ 株式会社 SRE 秋本 *1 : AWS resources that you can send Resolver query logs to - Amazon Route 53 *2 : Public DNS query logging - Amazon Route 53 *3 : https://repost.aws/ja/knowledge-center/error-json-athena *4 : https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html *5 : https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html *6 : https://docs.aws.amazon.com/athena/latest/ug/views.html
アバター
こんにちは、MNTSQ( モンテスキュー )で アルゴリズム エンジニアをしている清水です。 MNTSQは契約書を解析・管理・検索するプロダクトを提供しています。これらのプロダクトには大規模 言語モデル (以下LLM)が搭載された機能が実装されています。また、LLMを活用した新プロダクトも鋭意開発中です。 LLMをアプリケーションに組み込む際の大きな課題の一つとして、 「LLMの出力形式(型)を如何に矯正するか?」 が挙げられます。単純なチャットアプリケーションであればそこまで問題にはなりませんが、LLMによる生成結果を後続のプログラムで処理する必要がある場合、事前に定義された型に従って出力を生成する必要があります。 現在、複数のLLMサービスで出力形式を制御する機能が搭載されていますが、本記事では Google が提供している Gemini の Structured output を取り上げます。本記事では、 開発の過程で得られた、GeminiのStructured outputにおける7つのTips を紹介したいと思います。 サンプルコード 例として以下のように Python の Google Gen AI SDK ( google -genai)を使用することを想定しています。 google -genaiでは types.GenerateContentConfig の response_schema に Pydantic のモデルを渡すことで、Structured outputを使用することができます。 本記事ではStructured outputの機能にフォーカスするのでプロンプトは最低限の内容にしています。また、 スキーマ として指定するPydanticモデルも、タイトルの抽出と契約書かどうかを判定するだけのシンプルなものにしています。また、Gemini API ではなく Vertex AI の API を介してGeminiを使用します。(ほとんどのケースでGemini API に対しても同じTipsが適用できると思いますが、一部仕様が異なる可能性があります。) from google.genai import Client, types from pydantic import BaseModel PROMPT_TEMPLATE = """ \ JSONスキーマに従って、ドキュメントの内容を分析してください。 <json_schema> {json_schema} </json_schema> <document> {document_text} </document> """ class ContractAnalysisResult (BaseModel): document_name: str is_contract: bool def analyze_contract (document_text: str ) -> ContractAnalysisResult: client = Client(vertexai= True , project= "development" , location= "global" ) prompt = PROMPT_TEMPLATE.format( json_schema=ContractAnalysisResult.model_json_schema(), document_text=document_text, ) contract_analysis_result = client.models.generate_content( model= "gemini-2.5-flash" , contents=prompt, # response_schemaにPydanticモデルを渡す config=types.GenerateContentConfig( response_schema=ContractAnalysisResult, ), ) return contract_analysis_result Tips 1: プロンプトに JSON スキーマ を含めない 一番簡単に試すことができるテクニックは、「プロンプトに JSON スキーマ を含めない」です。実は、 response_schema を設定した場合は、 JSON スキーマ をプロンプトに含めない ことが 公式のドキュメント で推奨されています。 警告:   responseSchema  を構成する場合は、テキスト プロンプトで スキーマ を指定しないでください。これにより、予期しない結果や品質の低い結果が生じる可能性があります。 以下のサンプルコードでは、上記のサンプルコードから JSON スキーマ を埋め込んでいた箇所を削除しています。 # response_schemaを指定する際には、JSONスキーマをプロンプトに含めない PROMPT_TEMPLATE = """ \ JSONスキーマに従って、ドキュメントの内容を分析してください。 <document> {document_text} </document> """ Structured outputについてすべての仕組みが詳細に明かされているわけではないので、なぜ JSON スキーマ をプロンプトに含めないことが推奨されるのか技術的な理由は分かりません。公式のドキュメントで”don't duplicate the schema in the text prompt.”と書いてあることから、重複した情報をLLMに与えることが悪影響を及ぼすのかもしれません。 また、OpenAIやAnthropicのドキュメントには同様の記述は見当たらず(見逃していたらすみません)、Gemini特有の性質である可能性もあります。 Tips 2: title と description を設定する JSON スキーマ の各フィールドにおいて、 自然言語 による説明を付けたい時は以下のように title や description フィールドを使いましょう 。Structured outputでは JSON スキーマ による構造化されたデータしか渡せないと勘違いされがちですが、LLMらしく 自然言語 による情報も与えることができます。 from pydantic import BaseModel, Field class ContractAnalysisResult (BaseModel): document_name: str = Field( title= "タイトル (Title)" , description= "ドキュメント冒頭に記載されているタイトル" , ) is_contract: bool = Field( title= "契約書かどうか (Is Contract)" , description= "ドキュメントが契約書かどうかを示す。就業規則や賃金規定などは契約書ではない。" , ) ここで定義された title と description は Vertex AIのAPIリファレンス で記述されている responseSchema フィールドの title フィールドと description フィールドに渡されます。(詳しくは次の項目で言及します) Tips 3: API に渡されるパラメータを確認する Pydanticモデルを response_schema に渡すだけでStructured outputを使用できますが、 最終的にどのような形式で API に渡されるのかを確認する ことは有効です。以下のようにして、 response_schema に渡したPydanticモデルが、どのように API に渡すためのパラメータに変換されるかを確認することができます。 from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, Field class ContractAnalysisResult (BaseModel): document_name: str = Field( title= "タイトル (Title)" , description= "ドキュメント冒頭に記載されているタイトル" , ) is_contract: bool = Field( title= "契約書かどうか (Is Contract)" , description= "ドキュメントが契約書かどうか。就業規則や賃金規定などは契約書ではない。" , ) if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) # _transformers.t_schemaにClientオブジェクトとPydanticモデルを渡す request_params = t.t_schema(client, ContractAnalysisResult) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " document_name ": { " description ": " ドキュメント冒頭に記載されているタイトル ", " title ": " タイトル (Title) ", " type ": " STRING " } , " is_contract ": { " description ": " ドキュメントが契約書かどうか。就業規則や賃金規定などは契約書ではない。 ", " title ": " 契約書かどうか (Is Contract) ", " type ": " BOOLEAN " } } , " property_ordering ": [ " document_name ", " is_contract " ] , " required ": [ " document_name ", " is_contract " ] , " title ": " ContractAnalysisResult ", " type ": " OBJECT " } 例えば、私は開発の過程で以下のような google -genaiの仕様(というよりはバグ?)を見つけました 1 。 下記のように、Pydanticモデルが 入れ子 構造になっている スキーマ において、以下のように親モデルの Field において title と description を設定します。 from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, Field class ContractTerm (BaseModel): effective_date: str expiration_date: str class ContractAnalysisResult (BaseModel): contract_term: ContractTerm = Field( title= "契約期間 (Contract Term)" , description= "契約有効日と失効日から構成される契約の期間。" , ) if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, ContractAnalysisResult) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) この スキーマ の t_schema の出力を確認してみると、以下のように title と description が消えてしまっていることが確認できます。( title はデフォルト値のクラス名( ContractTerm )が代わりに格納されています。) { " properties ": { " contract_term ": { " properties ": { " effective_date ": { " title ": " Effective Date ", " type ": " STRING " } , " expiration_date ": { " title ": " Expiration Date ", " type ": " STRING " } } , " property_ordering ": [ " effective_date ", " expiration_date " ] , " required ": [ " effective_date ", " expiration_date " ] , " title ": " ContractTerm ", " type ": " OBJECT " } } , " required ": [ " contract_term " ] , " title ": " ContractAnalysisResult ", " type ": " OBJECT " } 以下のように親モデルの Field ではなく子モデルの ConfigDict で title を、docstringで description を設定すると、問題なく変換されます。 from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, ConfigDict class ContractTerm (BaseModel): """契約有効日と失効日から構成される契約の期間。""" # docstringを設定するとdescriptionとして認識される model_config = ConfigDict(title= "契約期間 (Contract Term)" ) effective_date: str expiration_date: str class ContractAnalysisResult (BaseModel): contract_term: ContractTerm if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, ContractAnalysisResult) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " contract_term ": { " description ": " 契約有効日と失効日から構成される契約の期間。 ", " properties ": { " effective_date ": { " title ": " Effective Date ", " type ": " STRING " } , " expiration_date ": { " title ": " Expiration Date ", " type ": " STRING " } } , " property_ordering ": [ " effective_date ", " expiration_date " ] , " required ": [ " effective_date ", " expiration_date " ] , " title ": " 契約期間 (Contract Term) ", " type ": " OBJECT " } } , " required ": [ " contract_term " ] , " title ": " ContractAnalysisResult ", " type ": " OBJECT " } 思ったように型の矯正が効かないときはこのようなエッジケースを踏んでいるのかもしれません。そのような時は、この方法を使って API に渡されるパラメータを確認すると良いでしょう。 Tips 4: date 型や datetime 型を使用する スキーマ を定義するPydanticモデルの各フィールドおいて、 Python の date 型や datetime 型を使用する ことができます。以下のPydanticモデルを t_schema に渡すと以下のようなパラメータに変換されていることが確認できます。 from datetime import date from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel class ContractTerm (BaseModel): effective_date: date # str型ではなくdate型を指定 expiration_date: date if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, ContractTerm) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果( "format": "date", となっている箇所に注目してください) { " properties ": { " effective_date ": { " format ": " date ", " title ": " Effective Date ", " type ": " STRING " } , " expiration_date ": { " format ": " date ", " title ": " Expiration Date ", " type ": " STRING " } } , " property_ordering ": [ " effective_date ", " expiration_date " ] , " required ": [ " effective_date ", " expiration_date " ] , " title ": " ContractTerm ", " type ": " OBJECT " } この format フィールドは title や description と同様に API の responseSchema でサポートされているフィールドです。ただし、どのようなformatでも良いわけではなく現状は date 、 date-time 、 time 、 duration のみが サポートされているようです 。それぞれ Python のdatetimeライブラリの date クラス、 datetime クラス、 time クラス、 timedelta クラスが対応しています 2 。 from datetime import date, datetime, time, timedelta from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel class DateTimeClasses (BaseModel): date_field: date datetime_field: datetime time_field: time duration_field: timedelta if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, DateTimeClasses) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " date_field ": { " format ": " date ", " title ": " Date Field ", " type ": " STRING " } , " datetime_field ": { " format ": " date-time ", " title ": " Datetime Field ", " type ": " STRING " } , " time_field ": { " format ": " time ", " title ": " Time Field ", " type ": " STRING " } , " duration_field ": { " format ": " duration ", " title ": " Duration Field ", " type ": " STRING " } } , ... } Tips 5: その他サポートされている API のフィールドを使用する responseSchema がサポートしているフィールドは、上記で紹介した title 、 description 、 format フィールド以外にもあります。詳しくは 公式ドキュメント をご参照ください。これらのフィールドはPydanticで以下のように表現できます。 from datetime import date, datetime, time, timedelta from enum import Enum from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, Field class EnumClass (Enum): A = "a" B = "b" C = "c" class Schema (BaseModel): number_field: int = Field(ge= 1 , le= 10 ) string_field: str = Field(min_length= 1 , max_length= 10 ) list_field: list [ int ] = Field(min_items= 1 , max_items= 10 ) with_pattern_field: str = Field(pattern= r"^[a-z]+$" ) # examplesを渡すとエラーになるので注意 with_example_field: str = Field(json_schema_extra={ "example" : "example string" }) nullable_field: str | None = Field(default= None ) any_of_field: str | int enum_field: EnumClass if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, Schema) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " number_field ": { " maximum ": 10.0 , " minimum ": 1.0 , " title ": " Number Field ", " type ": " INTEGER " } , " string_field ": { " max_length ": 10 , " min_length ": 1 , " title ": " String Field ", " type ": " STRING " } , " list_field ": { " items ": { " type ": " INTEGER " } , " max_items ": 10 , " min_items ": 1 , " title ": " List Field ", " type ": " ARRAY " } , " with_pattern_field ": { " pattern ": " ^[a-z]+$ ", " title ": " With Pattern Field ", " type ": " STRING " } , " with_example_field ": { " example ": " example string ", " title ": " With Example Field ", " type ": " STRING " } , " nullable_field ": { " nullable ": true , " title ": " Nullable Field ", " type ": " STRING " } , " any_of_field ": { " any_of ": [ { " type ": " STRING " } , { " type ": " INTEGER " } ] , " title ": " Any Of Field " } , " enum_field ": { " enum ": [ " a ", " b ", " c " ] , " title ": " EnumClass ", " type ": " STRING " } } , " property_ordering ": [ " number_field ", " string_field ", " list_field ", " with_pattern_field ", " with_example_field ", " nullable_field ", " any_of_field ", " enum_field " ] , " required ": [ " number_field ", " string_field ", " list_field ", " with_pattern_field ", " with_example_field ", " any_of_field ", " enum_field " ] , " title ": " Schema ", " type ": " OBJECT " } Tips 6: エラー回避のためのvalidatorを実装する 上記で紹介した date 型のフィールドや max_ength などはPydanticモデルの制約として働きます。例えば、 date 型のフィールドに無効な日付の文字列が代入されるとエラーになります。また、 max_length=10 と指定されているフィールドに11文字以上の文字列が渡されると同じくエラーになります。 この時、 Geminiがこれらの制約に違反した JSON を生成する可能性がある ことに注意が必要です。一定の矯正力はありますが、100%制約を守ってくれるわけではありません 3 。制約に違反したテキストが生成されたときにエラーにならないように、 生成されたテキストを加工するvalidatorを実装しておく と安全でしょう。 例えば私は、 date 型のフィールドに対して 0000-01-01 のような無効な日付をGeminiが生成するケースを観測したことがあります。この場合、以下のようなvalidatorを実装してエラーを回避すると良いでしょう。 import logging from datetime import date from typing import Any from pydantic import BaseModel, Field, ModelWrapValidatorHandler, ValidationError, field_validator class EffectiveDate (BaseModel): effective_date: date | None = Field(default= None ) @ field_validator ( "effective_date" , mode= "wrap" ) def date_parsing_validator (value: Any, handler: ModelWrapValidatorHandler[Any]) -> Any: """0000-01-01のような無効な日付をNoneに変換する""" try : return handler(value) except ValidationError as e: if "date_parsing" in (error[ "type" ] for error in e.errors()): logging.warning(f "Invalid date: {value}" ) return None else : raise e if __name__ == "__main__" : # Geminiが0000-01-01のような無効な日付を生成したと想定 effective_date = EffectiveDate.model_validate_json( '{"effective_date": "0000-01-01"}' ) print (effective_date) # WARNING:root:Invalid date: 0000-01-01 # effective_date=None Tips 7: Chain of Thoughtを意識する Chain of Thought (CoT)とは、結論だけでなく推論の過程も生成させることでLLMの性能を向上させる手法のことです。通常はプロンプトを工夫したり、専用にチューニングされたモデルを使用することでCoTを実現するのですが、 response_schema を工夫することで擬似的なCoTを実現することができます 。 例として、以下のようなPydanticモデルを定義します。 from datetime import date from pydantic import BaseModel, Field class ContractTerm (BaseModel): effective_date: date | None = Field(description= "契約期間の有効日" ) period: int | None = Field(description= "契約が持続する期間" ) expiration_date: date | None = Field(description= "契約期間の失効日" ) 欲しいのは effective_date と expiration_date だけですが、同時に period も抽出するようにしています。このようにすることで、例えば「本契約は2025年1月1日から3年間有効とする」のように契約失効日が直接的に書かれていない場合でも、事前に抽出した effective_date と period から expiration_date を計算してくれる効果が期待できます。 このように、フィールドを定義する順番を工夫したり、関連する情報を抽出するように促すことで、擬似的なCoTが期待できるでしょう。 注意事項 本記事で紹介した内容は、2025年6月時点のGemini/Vertex AIの仕様と、 google -genaiのバージョン1.19.0の仕様に基づいています。今後のアップデートによってGeminiや SDK の仕様が変更される可能性があります。実際に利用される際は、必ず公式のドキュメントをご確認いただくようお願いします。 まとめ 本記事では、GeminiのStructured outputでレスポンスの型を矯正するためのTipsをいくつか紹介しました。開発で得られた知見を全て盛り込んだら想定よりも多い文字数になってしまいました。是非開発のヒントにしていただけたら幸いです。 冒頭でも触れましたが、MNTSQではLLMを活用したプロダクトを鋭意開発中です。もしMNTSQの仕事にご興味を持っていただけたら、 ぜひお気軽にカジュアル面談でお話ししましょう! careers.mntsq.co.jp note.mntsq.co.jp tech.mntsq.co.jp この記事を書いた人 清水健 吾 MNTSQ アルゴリズム エンジニア LLMのご機嫌と格闘する日々です。 google -genaiバージョン1.19.0時点での動作です。バグであれば今後解消されるかもしれません。 ↩ 他にも Pendulum も対応しています。 ↩ どの程度の矯正力を持つかはフィールドによって異なるようです。例えば私の場合 enum フィールドに違反したケースに遭遇したことはありません。反対に min_length , max_length は矯正力が弱く、validatorの実装は必須だと思われます。 ↩
アバター
openapi-ts 導入 こんにちは、MNTSQ のソフトウェアエンジニアの森山です。今回は、 REST API の OpenAPI 3.0 から API クライアントを自動生成するまでの過程を紹介します。 実はメインのプロダクトへ TypeScript を導入できたのはつい最近のことです。 API クライアントを自動生成するまでの苦労や新たな発見が 1 つでも参考になれば嬉しく思います。 課題 API クライアントの自動生成に取り組む上で、現在の BE と FE には以下の課題がありました。 BE API フレームワーク 移行期のため、OpenAPI 2.0 と 3.0 の 2 つの API 定義ファイルが存在し、自動生成前に merge が必要。 FE TypeScript へ移行できていない JavaScript が大半。 API コールを堅牢にするための独自の機構が複雑で認知負荷が高い。 API レスポンスが class 化されているが TypeScript の型として利用できない。 自動生成の目的 型や API クライアントの自動生成の目的は以下です。 よりシンプルな API コール API の破壊的変更を検知 TypeScript の導入を加速 詳細な背景は以下の通りです。 よりシンプルな API コール 独自の機構を撤廃し、 API コール処理の認知負荷を下げたい。TypeScript の型でよりシンプルに解決できる。 API の破壊的変更を検知 既存の API コールを堅牢化する機構はランタイム上で動作します。そのため API の破壊的な変更を開発中に見逃すことがありました。開発中に 機械的 に検知できる必要がある。 TypeScript の導入を加速 型が自動生成できると以下の要因で加速できる。 TypeScript を導入したばかりで使える型が少ないが、一気に使える型が増える。 型のメンテナンス 工数 が削減できる。 過剰なプロパティを持った型が生まれない。(流用性を高める意図で生まれやすい) ライブラリの比較 以下の 3 つのライブラリが検討の対象です。 openapi-ts(採用) openapi-typescript swagger-typescript- api 結果としては 1. openapi-ts を採用しました。 次にその選定における観点と過程を説明します。 比較観点 型の流用性 API コール時の認知負荷 型の流用性 特に API のパラメータやレスポンスの型が流用しやすい形式であるか。それらの型は API コールの前後の加工処理等で参照したいことがあります。出力される型が API コールの関数のみだと、その関数の型から引数や返り値の型を抜き出す必要があるため複雑になります。 API コール時の認知負荷 API コール時のインターフェースがシンプルかどうか。 API コールのために関数や型をいくつも import したくないです。関数名を書いただけで補完が始まり実装が自然と進んでいく体験が理想です。 以下は上記の観点を具体化した比較表です。 ライブラリ api クライアントの生成 snake_case ↔ camelCase の変換 自動生成時の安定性 型の流用性 API コールに必要な import 数 endpoint の型制御 path parameter の型制御 query parameter の型制御 request body の型制御 response body の型注釈 openapi-ts ◯ ◯ ◯ ◯ △ ◯ ◯ ◯ ◯ ◯ openapi-typescript x ◯ ◯ ◯ x △ △ ◯ ◯ ◯ swagger-typescript- api ◯ ◯ x △ ◯ ◯ ◯ ◯ ◯ ◯ 各ライブラリごとにプロトタイプを実装しました。手を動かして得た発見と評価も合わせて以下に記載します。 openapi-ts メリット API のパラメータ、リク エス トの型が独立して定義されている API コールの関数を自動生成できる API クライアント(fetch, axios, …etc)を選択できる あらゆる型補完が効く デメリット API クライアントの インスタンス を API コールの関数に都度渡す API コールの際に API クライアントの インスタンス として毎回同じものを渡すのが冗長です。しかし、それ以外は観点を満たしていました。 実装例 import { typedAxios } from "./client" import { postV2Authentication } from "./generated/sdk.gen" import { getV2DocumentDocumentId } from "./generated/sdk.gen" import { getV2DocumentDiff } from "./generated/sdk.gen" // 認証 postV2Authentication( { client : typedAxios, // fetchやaxios等のAPIクライアントを毎回渡す必要がある body : { email , password } , } ) // document取得 getV2DocumentDocumentId( { client : typedAxios, path : { documentId : 1 } , } ) // user取得 getV2User( { client : typedAxios, query : { userId : 1 } , } ) requestBody の型参照も簡単です。 import { typedAxios } from "./client" import { postV2Authentication } from "~/api/openapi-ts/generated/sdk.gen" import type { PostV2AuthenticationData } from "./generated" // requestBodyの型をimport(pathパラメータ、queryパラメータも可) export const authentication = async ( { email , password } : PostV2AuthenticationData[ "body" ]) => { // ...何か前処理をしたり const response = await postV2Authentication( { client : typedAxios, body : { email , password } , } ) return response.data } openapi-typescript メリット 型のみの生成でカスタマイズ性が高い デメリット API コールの関数生成には派生ライブラリの openapi-fetch が必要( API クライアントは fetch 限定) axios を利用すると必要な import が多い API クライアントが fetch であれば有力だった可能性がありますが、現状は axios を活用しています。また型のみを生成するのは流用性が高く良いと思っていました。しかし axios に型を渡して矯正すると API コールのために必要な import が増えます。そして型の構造的に必要な型を探り当てるのが面倒でした。 実装例 import { type paths, type operations } from "./generated/schema.d" import { typedAxios } from "./client" // 認証 type Request = operations [ "postV2Authentication" ][ "requestBody" ][ "content" ][ "application/x-www-form-urlencoded" ] type Response = operations [ "postV2Authentication" ][ "responses" ][ "201" ] typedAxios.post< Response >( "/v2/authentication" , { email , password , } ) // ドキュメント取得 type Request = operations [ "getV2DocumentDocumentId" ][ "parameters" ][ "path" ] type Response = operations [ "getV2DocumentDocumentId" ][ "responses" ][ "200" ][ "content" ][ "application/json" ] typedAxios. get < Response >( `/v2/document/ ${ documentId } ` ) 上記はプレーンな axios のため import が多く、型の深堀りが必要です。 それを解消したものも実装しました。OpenAPI 3.0 の構造では HTTP メソッドと endpoint の組み合わせで欲しい API が特定できます。そのため HTTP メソッドと endpoint を渡せばパラメータやリク エス トの型を推論できる axios を実装しました。コードすべてではないですが実装の概要は把握できると思います。 渡す型を最小限にした axios // カスタマイズしたaxios const customAxios = async <M extends Methods, E extends Endpoint< M >>( { methods , endpoint , pathParams , queryParams , body , } : { methods : M endpoint : E pathParams ?: Snake2Camel < PathParams < M >> queryParams ?: Snake2Camel < QueryParams < M >> body ?: Snake2Camel < RequestBody < M , E >> } ): Promise < AxiosResponse < Snake2Camel < SuccessResponse < M , E >>>> => { const dynamicEndpoint = pathParams ? getDynamicEndpoint(endpoint, camel2SnakeDeep(pathParams)) : endpoint const snakeCaseBody = body ? camel2SnakeDeep(body) : body const response = await axios[methods]< SuccessResponse < M , E >>( ` ${ dynamicEndpoint }${ getQueryParams(queryParams) } ` , snakeCaseBody ) return { ...response, data : snake2CamelDeep(response.data), } } // 呼び出しイメージ await customAxios( { methods : "get" , endpoint : "/api/v2/document/{document_id}" , pathParams : { documentId } , } ) 呼び出し時には補完が HTTP メソッド → endpoint → パラメータと順番に絞り込まれるように推論されます。しかし見ての通り実装が複雑です。他のライブラリのように endpoint 毎に関数が生えた方が圧倒的にリーダブルです。また上記を用いて AI にコード生成を指示するとコード生成 → 型エラー → コード生成 を繰り返して徐々に正しいコードに近づけていく様子で、AI の精度が落ちるのも難点でした。 swagger-typescript- api メリット 呼び出しが最もシンプル API クライアント(fetch, axios, …etc)を選択できる デメリット JSON ファイルに特定の文字が含まれると自動生成に失敗する パラメータやレスポンスの型が流用しづらい API コールのインターフェースは最もシンプルでした。しかし 参照元 の JSON ファイルに「*( アスタリスク )」が含まれていると自動生成に失敗します。OpenAPI のコメント等には様々な文字列を使う可能性があるため運用が辛くなる印象です。またパラメータやレスポンスの型が独立して参照できません。型の取り出しが面倒でした。 型の取り出し import { typedAxios } from "./typedAxios" export const getDocument = async ( { documentId , } : // 特定のqueryパラメータが欲しい時にapiの関数から引数の型を抜き出す必要がある。 Parameters< typeof typedAxios.v2.getV2Document >[ "0" ]) => { return await typedAxios.v2.getV2Document( { documentId , } ) } 実装例 import { Api } from "./api" const typedAxios = new Api() // 認証 typedAxios.v2.postV2Authentication( { email , password , } ) // ドキュメント取得 typedAxios.v2.getV2DocumentDocumentId(documentId) ライブラリ比較まとめ 消去法的に openapi-ts を採用しました。 以下の懸念が他ライブラリのノックアウトファクターでした。 openapi-typescript 呼び出し時の認知負荷の高さ swagger-typescript- api JSON ファイルに使われている文字を気にする必要がある 型の流用性が低い 導入の前処理 冒頭にあった課題を払拭するために以下の前処理が必要です。 OpenAPI 2.0(Swagger) → OpenAPI 3.0 の変換 openapi. json の merge snake_case ↔ camelCase の変換 OpenAPI 2.0(Swagger) → OpenAPI 3.0 の変換 API クライアントの自動生成ライブラリは OpenAPI 3.0 形式であることを想定しているため、 前処理として swagger2openapi というライブラリで OpenAPI 2.0(Swagger) → OpenAPI 3.0 へ変換しました。 npx swagger2openapi swagger.json -o openapi.json 1 コマンドでキレイに 2 系 →3 系になってくれて嬉しかったです。 openapi. json の merge openapi-ts が読み込める JSON ファイルは 1 つなので、2 つの API 定義 JSON を merge します。openapi. json の中には様々なプロパティがありますが、merge したいのは以下の 2 つです。 path: 各 endpoint と HTTP method 等の情報が定義 components: 具体的な スキーマ を内包 上記を単純に merge することで欲しい json が手に入りました。 import * as openapiJson1 from "openapi-1.json" import * as openapiJson2 from "openapi-2.json" import fs from "fs" /** * openapi.jsonをマージして新規ファイルとして出力 */ const mergedJson = { ...openapiJson1, paths : { ...openapiJson1.paths, ...openapiJson2.paths, } , components : { ...openapiJson1.components, ...openapiJson2.components, } , } fs.writeFileSync( "merged.json" , JSON . stringify (mergedJson, null , 2 )) snake_case ↔ camelCase の変換 FE のコーディングスタイルが camelCase なのに対して BE は snake_case です。この乖離については API コールのパラメータ作成時やレスポンス受け取り時に変換をする必要があります。 API コール時に都度変換するのは認知負荷が高いため共通処理に含めることにしました。 共通処理に含めるデメリットとして以下があります。 変換ユーティリティの開発・メンテナンスの手間 ランタイム上の変換処理によるオーバーヘッド しかし上記よりも開発者体験の方が価値があると判断しました。 また重要なポイントとして API クライアントの自動生成前の API 定義 JSON にもケース変換を施しました。つまり API 定義 JSON の時点でパラメータやレスポンスを camelCase にしておきました。これをしないと生成後の API クライアントが型補完で snake_case を要求してしまうので type error になります。openapi. json の時点でケース変換ができると関数の引数と返り値の型としては camelCase で出力してくれます。あとは axios の interceptors に変換処理を入れるだけです。 axios の interceptors import { client } from "client.gen" // 自動生成されたaxiosのクライアント client.setConfig( { baseURL : "/" } ) /** リクエストパラメータをsnake_caseに変換 */ client.instance.interceptors.request.use(( request : InternalAxiosRequestConfig < any >) => { // snake_caseへの変換処理 return request } ) /** レスポンスデータをcamelCaseに変換 */ client.instance.interceptors. response .use(( response : AxiosResponse < any , any >) => { // camelCaseへの変換処理 return response } ) export { client } 本筋から脱線: 型の上書きでケース変換 BE が生成した 参照元 の JSON を FE の都合に合わせて変更してしまうと思わぬ不都合が起きるのではと懸念がありました。そのため型の上書き等も試してみました。 いざ試すと生成後の関数や型に対しての TypeScript 上でのケース変換はしんどいです。例えばパスパラメータの型を snake_case から camelCase に変換するだけでも後述の複雑な型が必要になります。またランタイムで実行されるコードと比較して型に対しての検証は難しいです。そのためこの複雑な実装よりかは JSON を書き換える方が妥当と考えました。 型変換の一部 type Snake2CamelString < T extends string > = T extends ` ${ infer R } _ ${ infer U } ` ? ` ${ R }${ Capitalize< Snake2CamelString < U >> } ` : T // keyをsnake_case → camelCase type Snake2Camel < T > = T extends any [] ? Snake2Camel < T [number]>[] : T extends object ? { [ K in keyof T as Snake2CamelString < string & K >]: Snake2Camel < T [K]> } : T // httpメソッド type Methods = "get" | "post" | "put" | "patch" | "delete" // endpointのURL type Endpoint < M extends Methods > = { [ Key in keyof paths ]: M extends keyof paths [Key] ? Key : never } [keyof paths] // path parameter type PathParams < M extends Methods > = { [ Key in Endpoint < M >]: M extends keyof paths [Key] ? paths [Key][M] extends { parameters : { path : infer T } } ? T extends { [ key : string ]: string | number } ? T : never : never : never } [Endpoint<M>] // 最終的に欲しいpathParamsの型。 // これ以外にもqueryParamsやrequestBody,responseBodyにも似たようでちょっと違う変換をするgenericが必要 Snake2Camel< PathParams < M >> (脱線終わり。) before / after 今までは API コールの前後にクラスを通していました。 API クライアント自動生成後は関数を呼ぶだけでシンプルです。BE で破壊的変更も type error として検知できます。 今までの呼び出しイメージ import { repositoryFactory } from "./repositoryFactory" // parameterのバリデーションやケース変換 import { DocumentEntityClass } from "./documentEntityClass" // responseのケース変換やオブジェクト化 const documentGetRequest = repositoryFactory . documentDiff . getParam () documentGetRequest . documentId = documentId const documentEntity = new DocumentEntityClass () const response = await repositoryFactory . documentDiff . get ({ documentGetRequest }) documentEntity = response . data 新しい API コール import { typedAxios } from "./client" import { getV2DocumentDocumentId } from "./generated/sdk.gen" getV2DocumentDocumentId( { client : typedAxios, path : { documentId } , } ) まとめ API 定義 JSON から型だけを出力しても、認知負荷の低い API コールの実現は難しいことが分かりました。型のみでは結局、認知負荷を下げるために共通処理に複雑さが必要になってしまいます。 共 通化 するのではなく、シンプルな成果物に変換できる機構が必要でした。頑張って共 通化 し、インターフェースがシンプルになれば実装が捗ると思っていましたが、複雑さのシワ寄せとして AI のコード生成精度に影響するという気づきも得ることができました。 また今回の選定においては早めにプロトタイプを実装したことが良かった点だと振り返って思います。やりたいことや実現したいことの中核はぼんやりありましたが、実際に手を動かしてみることで比較するべき観点や実装イメージが具体化されました。ドキュメントに記載のない思わぬ欠点を早めに検知したことも収穫でした。 ご精読ありがとうございました。こうした技術的な意思決定のプロセスや、MNTSQ の日々の開発の進め方にご興味を持っていただけた方は、ぜひお気軽にカジュアル面談でお話ししましょう。 careers.mntsq.co.jp
アバター
小ネタです。そして掲題が全てを語っています。 以下、ECS on EC2 構成の ECS サービスにおいて ECS タスクを動作させるプラットフォームとなる EC2 インスタンス を ECS コンテナ インスタンス と呼称します。これは Launching an Amazon ECS Linux container instance へ微妙に倣っての呼び方になります。 3行で ECS on EC2 構成の ECS サービスで GuardDuty ECS Runtime Monitoring を有効化する場合、ECS サービスの更新は必要ない ECS コンテナ インスタンス で GuardDuty エージェントが動作し EC2 Runtime Monitoring の要件を満たせれば、その時点で ECS Runtime Monitoring も有効になる ECS コンテナ インスタンス での EC2 Runtime Monitoring 導入には GuardDuty エージェントが要求する制約がいくつかある なお本稿が伝えたいことは末尾の 追伸 で全て事足ります。 背景 現在 MNTSQ では SRE を中心にプロダクトセキュリティの向上施策を進めており、その中で GuardDuty の利用範囲拡充も目論んでいます。 ここで白羽の矢が立ったのが GuardDuty の機能のひとつである Runtime Monitoring です。詳細は AWS 公式ドキュメント( GuardDuty Runtime Monitoring ) に譲りますが、EC2 / EKS / ECS において インスタンス やタスクの振舞いを内部から観測し、脅威となりうる挙動の検出が可能なサービスです。 サポートされるサービスは前掲ドキュメントにもある通り EKS / ECS / EC2 で、EKS 以外は MNTSQ のワークロードにも合致します。 さて Runtime Monitoring の導入ですが、この方法にはいくつかの経路があります。 Enabling GuardDuty Runtime Monitoring にその全容があり、実に多様な手法が用意されていることが伺えます。MNTSQ では 全ての AWS アカウントは AWS Organizations で管理している GuardDuty 管理用に delegated admin として設定された AWS アカウント( AWS Organizations 配下だが organizations 管理アカウントではない)が存在する 導入対象は ECS とし、EC2 は追って導入を検討する ECS Runtime Monitoring 有効化にかかる手間は最小限のものとしたい。自動導入の仕組みがあれば積極的にこれを使いたい ただし有効化にかかる影響範囲のコン トロール を行いたいので Runtime Monitoring を有効にする AWS アカウントは明示的に選択したい Runtime Monitoring を有効にする決定をした AWS アカウント内では全ての ECS クラスタ を一律対象とする という背景があり、以下の要領で有効化を進めることにしました。 Runtime Monitoring 有効化対象のアカウントは明示的に指定する Enabling Runtime Monitoring for multiple-account environments の "For selective active member accounts only" で有効化設定をする GuardDuty エージェントの導入は自動でやってもらう Managing automated security agent for Fargate (Amazon ECS only) に解説があるとおり、Runtime Monitoring 有効化設定を投入した後に ECS サービスを更新すれば自動で GuardDuty エージェントが サイドカー コンテナとして起動してくる なお、ECS on Fargate 構成において GuardDuty エージェントが サイドカー コンテナとして起動する際、 ECS タスク定義への変更は特段発生しません 。 ECS サービス / タスク定義の範囲外の箇所で aws-guardduty-agent- という接頭辞の名称をもつコンテナが自動で起動してくるようになります。詳細は How Runtime Monitoring works with Fargate (Amazon ECS only) の "GuardDuty adds a sidecar container" の節に説明があります。 さて2025年6月現在、MNTSQ で扱う ECS サービスでは ECS on Fargate ECS on EC2 の2種類の構成をとるものがあります。ECS サービスの数で言えば ECS on Fargate が圧倒的に多く、ECS on EC2 は一部用途(主に GPU を利用したい向き)で使われるのみです。 前述ドキュメントに従い ECS Runtime Monitoring の導入をすすめると、ECS on Fargate に関しては ECS サービス更新後に Reviewing runtime coverage statistics and troubleshooting issues に示される手法にて healthy(= GuardDuty エージェントが稼動し Runtime Monitoring の動作も開始した)なことが確認できるようになりました。 実際の runtime coverage 画面。伏字が多い点はご容赦ください いっぽうでこの手法では ECS on EC2 構成の ECS サービスでは coverage が unhealthy のままになってしまう という気付きも得られました。さてどうしたことでしょう。 ECS Runtime Monitoring を ECS on EC2 構成で healthy にする ECS と EC2 のそれぞれで Runtime Monitoring がどのように動作するかは以下ドキュメントに示されています。 How Runtime Monitoring works with Amazon EC2 instances How Runtime Monitoring works with Fargate (Amazon ECS only) いずれも GuardDuty エージェントが動作していることが前提で、EC2 の場合は Systemd ユニットとして、ECS の場合は サイドカー コンテナとして動作します 今回目指したい ECS on EC2 構成の場合でも Fargate ではないとはいえ ECS ではあるので、 サイドカー コンテナとして GuardDuty エージェントは動作するのではないかと考えるのは自然なはずです。少なくとも本稿筆者はそう考えました。しかしながら実際に作業をしてみると 作業 ECS on Fargate で Runtime Monitoring が healthy になった ECS on EC2 で Runtime Monitoring が healthy になった GuardDuty 側で ECS Runtime Monitoring を有効にした × × エージェントの自動導入を有効にした × × ECS タスクの入れ替えをした × × ECS サービスを更新した ○ × ECS コンテナ インスタンス で GuardDuty エージェントを動作させた ×(関係なし) ○ という格好になりました。つまり ECS on EC2 構成の場合は ECS ではなく EC2 側での Runtime Monitoring 対応が必要 という洞察が得られました。 ちなみにこのとき ECS サービス上では サイドカー コンテナとしての GuardDuty エージェントは稼動せず、EC2 インスタンス 上で動作する GuardDuty エージェントがその役目を担っている模様です。 ECS コンテナ インスタンス で GuardDuty エージェントを動作させる どこで何が必要になるかが判れば話は早いです。事前準備としては以下を参照すればよいでしょう。 How Runtime Monitoring works with Amazon EC2 instances Prerequisites for Amazon EC2 instance support 早い話が以下です。 EC2 インスタンス プロファイルで SSM の Run Command によるコマンド実行が許可されるよう権限設定を行う 新しめの Linux カーネル が利用可能な状態で EC2 インスタンス を稼動させる Linux カーネル のバージョンが見落されがちなので注意が必要です。筆者は見落しました。 ECS コンテナ インスタンス を動作させる場合、おおよそのケースでは ECS-optimized AMI が利用されると思います。MNTSQ でも例に漏れず ECS-optimized AMI を使用し ECS コンテナ インスタンス を稼動させています。この ECS-optimized AMI で上記ドキュメントに示される Linux カーネル 5.4 以上のもの *1 を使う方法を考える必要があります。 最も簡単なのは Amazon ECS-optimized Linux AMIs や Retrieving Amazon ECS-optimized Linux AMI metadata で案内されている、 Linux カーネル 5.10 を標準で使用する ECS-optimized AMI に差し替えてしまうというものでしょう。弊社でもこの差し替えを行うことで対応としました。 まとめ ECS on EC2 構成の場合は ECS ではなく EC2 側での Runtime Monitoring 対応が必要 という点、早々に気付ければ話は早かったのですが、「ECS が対象なのだから ECS 向けの有効化作業に何か抜け漏れがあるはずだ」と執着してしまい、試行錯誤をする羽目になりました。Runtime Monitoring に関する AWS 公式ドキュメントのうち ECS に言及されるものはほぼ全て ECS on Fargate が対象の模様で、ECS on EC2 構成に関しての言及が見られない点も少々難儀する箇所だったように思います。プラットフォームを自前で管理する場合の観点を今一度鍛えようと思える機会になりました。 ECS on EC2 構成で ECS Runtime Monitoring が一向に有効化できないという状況にお困りの方の一助となれば幸いです。 MNTSQ 株式会社 SRE 秋本 追伸 ECS on EC2 構成の場合は ECS ではなく EC2 側での Runtime Monitoring 対応が必要 という点、本稿筆者が本作業を行った際には先行情報となるものを見付けられず、本記事が有意義なものになると妄想していました。しかし今本記事を書きつつ探してみたところ、 Turning on Runtime Monitoring for Amazon ECS という ECS のドキュメント(GuardDuty のドキュメントではない) で ECS on EC2 向けの解説がありました。もっと早く知りたかった……。 *1 : ECS-optimized AMI は Amazon Linux 2 か Amazon Linux 2023 がベースになっているので、 Ubuntu や Debian 向けの情報として扱われている内容には触れていません
アバター
こんにちは、MNTSQで アルゴリズム エンジニアをやっている平田です。 MNTSQではAIで企業の契約業務を変革するプロダクトを開発しています。 mntsq.co.jp ところでみなさん、 MCP (Model Context Protocol)使っていますか? 2024年11月にAnthropicがMCPを提唱 してから半年しか経っていないのに、 MCP を取り巻くAIエージェント開発のエコシステムは爆発的なスピードで成長を遂げています。 (実際、この記事を書いている最中にアップデートがあって、何度か書き直しています🫠) 先日 MCP がStreamable HTTPをサポートしたため、MNTSQでも自社プロダクトへの MCP 導入を検討し始めました。 Streamable HTTPではサーバーをステートレスにできるので、 アーキテクチャ がシンプルになり、水平スケーリングが容易になります。これはMNTSQのような SaaS での MCP 活用において非常に重要です。 この記事では、具体的なアプリケーションの実装を通じて、 SaaS での利用を想定した MCP の使い方を学びます。 アプリケーションの主な要件は次の通りです: MCP サーバーをステートレスにする : Streamable HTTPでステートレスな MCP サーバーを構築します。これにより、SSE(Server-Sent Events)よりも アーキテクチャ がシンプルになり、保守性やスケーラビリティが向上します。 生成されたツールの情報を検証する : Function Callingで誤ったリソースにアクセスすることを防ぐため、ツールの自動実行を無効化して、生成されたツールの情報を検証・修正してから実行します。 Gemini API を使う : MNTSQの契約データを扱うには、非常に長いコンテキストウィンドウを持つGeminiが適しています。 LangChainを使わない : LangChainは便利ですが、実際には使用しない機能が依存関係に含まれます。依存関係の 脆弱性 や競合によるメンテナンスコストを下げるため、シンプルかつ軽量な構成を保ちます。 アプリケーションの概要 MCP サーバーで提供するツールは何でも良いので、今回はシンプルにElasticsearchをバックエンドとするRAGアプリケーションを実装します。 リポジトリ この記事で紹介する ソースコード や実行方法はすべて次の リポジトリ にあります。 github.com アーキテクチャ graph TD A[Application] -->|ツール実行| M[MCPサーバー<br/>(Streamable HTTP)] A -->|Function Calling| G[Gemini API] M -->|データ取得/検索| E[Elasticsearch] class A appClass class M mcpClass class G apiClass class E dbClass 処理の流れ sequenceDiagram participant App as Application participant Gemini as Gemini API participant MCP as MCPサーバー Note over App: 初期化 App->>MCP: セッション開始 Activate MCP App->>MCP: ツール一覧取得 (list_tools) MCP-->>App: 利用可能なツール一覧<br/>(get_indices, get_mapping, search) Note over App: 初回リクエスト App->>Gemini: コンテンツ生成リクエスト<br/>(プロンプト + ツール定義) Gemini-->>App: レスポンス loop レスポンスにfunction_callが含まれる限り Note over App: Function Calling App->>MCP: ツール実行 MCP-->>App: ツール実行結果 Note over App: 2回目以降のリクエスト App->>Gemini: 次のコンテンツ生成リクエスト<br/>(履歴 + ツール実行結果) Gemini-->>App: レスポンス end Note over App: 最終回答 App->>App: 最終回答を出力 App->>MCP: セッション終了 Deactivate MCP 例えば、「昨日の売上をカテゴリ別に集計してください。」というプロンプトに対して、次のように動作します。 セッション開始 MCP サーバーからツール一覧を取得 ツール get_indices によりインデックス一覧を取得 ツール get_mapping により kibana_sample_data_ecommerce の マッピング を取得 ツール search によりプロンプトをElasticsearchの DSL に変換して検索を実行 検索結果から回答を生成 回答例: 昨日のカテゴリ別の売上は以下の通りです。 * **Men's Clothing**: 3999.13 * **Women's Clothing**: 3924.91 * **Women's Shoes**: 3360.66 * **Men's Shoes**: 2197.89 * **Men's Accessories**: 1669.72 * **Women's Accessories**: 1292.59 解説 MCP サーバー MCP サーバーの実装は、主に MCP Python SDK の公式ドキュメントを参考にしています。 github.com MCP サーバーは、次の3つのツールを提供します。 get_indices : インデックス一覧取得 get_mapping : マッピング 取得 search : 検索 @ mcp.tool (description= "Elasticsearchで検索を実行するツール" ) def search (index: str , query_body: dict [ str , Any], ctx: Context) -> Any: logger.info( "search tool called" ) logger.debug(f "index: {index}, query: {query_body}" ) es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.search(index=index, body=query_body) return response @ mcp.tool (description= "Elasticsearchのインデックスを取得するツール" ) def get_indices (ctx: Context) -> Any: logger.info( "get_indices tool called" ) es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.indices.get_alias( "*" ) return response @ mcp.tool (description= "Elasticsearchで指定したインデックスのマッピングを取得するツール" ) def get_mapping (index: str , ctx: Context) -> Any: logger.info( "get_mapping tool called" ) logger.debug(f "index: {index}" ) es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.indices.get_mapping(index=index) return response ステートレスを有効化するには FastMCP で stateless_http=True を指定します。 mcp = FastMCP(name= "SearchServer" , stateless_http= True , lifespan=lifespan) MCP サーバーは FastAPI にマウントできます。 app = FastAPI(lifespan=lifespan) app.mount( "/search" , search.mcp.streamable_http_app()) MCP クライアント MCP クライアントの実装は、主に Google Gen AI SDK ( google-genai==1.19.0 )の公式ドキュメントを参考にしています。 ai.google.dev Google Gen AI SDK の公式ドキュメントとの差分は次の3点です。 stdio_client ではなく streamablehttp_client を使う ツールの自動実行を無効化する 最終回答に至るまでFunction Callingを繰り返す Streamable HTTPを使用するため、 streamablehttp_client を使ってセッションを開始します。 async with streamablehttp_client( f "http://{mcp_server_host}:{mcp_server_port}/search/mcp/" ) as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() Function Callingのために、 MCP サーバーで提供されるツールの情報と、ツールの自動実行を無効化する設定をGemini API に渡します。 MCP サーバーで提供されるツールの情報は、 tools=[session] で渡します。( google-genai==1.15.0 以前は types.FunctionDeclaration オブジェクトに変換する必要がありました。) ツールの自動実行を無効化する設定は、 automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True) で渡します。無効化する理由は、主に認可の目的で、生成されたツールの情報を検証・修正してから実行するためです。 config = types.GenerateContentConfig( system_instruction=SYSTEM_INSTRUCTION, temperature= 0 , tools=[session], # type: ignore[arg-type] automatic_function_calling=types.AutomaticFunctionCallingConfig(disable= True ), ) ツールの自動実行を無効化しているので、ツールは自分で実行する必要があります。ツールの実行に関する情報(ツール名と引数)は response.candidates[0].content.parts[-1].function_call に出力されます。 Gemini API の公式ドキュメントでは parts[0] を使用していますが、次のように text と function_call が混在する場合があるため parts[-1] を使用するほうが確実です。この順番は Google が保証するものではないですが、ツールの自動実行を無効化するとユーザーの types.FunctionResponse を待ち受ける状態になるので、末尾になると考えるのが自然です。 { " parts ": [ { " text ": " 昨日の売上をカテゴリ別に集計するために、まず売上情報が含まれていそうなインデックスを特定する必要があります。 \n 利用可能なインデックスをリストアップするために、`get_indices()` を実行します。 \n " } , { " function_call ": { " args ": {} , " name ": " get_indices " } } ] } ツールは MCP サーバーが提供しているので、 MCP サーバーにリク エス トしてツールを実行します。 # NOTE : ツールの引数を修正する場合は、 `function_call.args` を直接書き換える # function_call.args["index"] = "kibana_sample_data_ecommerce" result = await session.call_tool(function_call.name, arguments=function_call.args) まとめ この記事では、RAGの実装を通じて、Gemini Function Callingと MCP Streamable HTTPの使い方を学びました。 MCP がStreamable HTTPをサポートしたおかげで、 MCP サーバーの アーキテクチャ がシンプルになりました。 MCP が JSON -RPCを採用しているおかげで、生成されたツールの情報を柔軟に検証・修正することができます。 余談ですが、実装したアプリケーションを実際に動かしてみると、「売上」に対してちゃんと kibana_sample_data_ecommerce インデックスを参照したり、 DSL を教えていないのにElasticsearchという情報だけで正しくクエリを生成したりと、Geminiの生成能力に驚かされます。 MCP を取り巻くエコシステムはまだ発展途上なので、この記事の情報もすぐ古くなってしまうかもしれませんが、みなさんのAIエージェント開発の一助となれば幸いです。 もしMNTSQの仕事にご興味を持っていただけたら、ぜひ以下のページもご覧ください。 careers.mntsq.co.jp note.mntsq.co.jp
アバター
はじめに 構成 実装してみる EventBridge EventBridge API destinationsの設定 オートスケールイベントを拾うためのEventルールを作成する Datadog Slack連携の設定 Event Monitorの設定 おわりに はじめに ECSのオートスケールは、一度設定してしまえば非常に便利ですが、人の手を離れて安定運用に乗せるまでには様々な技術的なハードルがあります。安定運用に入るまでは、現在の設定は妥当なのかを判断するため、オートスケールが発生したことを何らかの方法で人間が把握し、日々改善を重ねていくことが必要不可欠です。そこで今回は、ECSのオートスケールイベントをEventBridgeで拾い、Datadogに連携してSlack通知する仕組みを実装してみます。 構成 全体の構成は以下の通りです。ECS クラスタ やサービスを限定せずに、オートスケールイベントが発生した時に通知を行うような汎用的な仕組みを作っていきます。 オートスケールのイベントをEventBridgeで拾う Datadog Event Management API からDatadogに連携する Datadog Event Monitorにてアラートの設定を行う Slackにメッセージを送信する 実装してみる EventBridge EventBridge API destinationsの設定 EventBridgeでは、 API destinations(API送信先) を設定することによって、 AWS 内のイベントを任意の API に連携することができます。 以下のサンプルコードにて、 API destinationsおよび必要なIAMリソースを作成します。DATADOG_ API _KEYはあらかじめSecrets Managerに登録されているものとしています。 サンプルコード # IAM data "aws_iam_policy_document" "eventbridge_sts" { statement { effect = "Allow" actions = [ "sts:AssumeRole" ] principals { type = "Service" identifiers = [ "events.amazonaws.com" ] } } } data "aws_iam_policy_document" "eventbridge_datadog" { statement { effect = "Allow" actions = [ "events:InvokeApiDestination" ] resources = [ aws_cloudwatch_event_api_destination.datadog.arn ] } statement { effect = "Allow" actions = [ "secretsmanager:DescribeSecret" , "secretsmanager:GetSecretValue" , ] resources = [ "arn:aws:secretsmanager:*:*:secret:events!connection/<DATADOG_API_KEYのシークレット名>/*" ] } } resource "aws_iam_role" "eventbridge_datadog" { name = "eventbridge_datadog_role" assume_role_policy = data.aws_iam_policy_document.eventbridge_sts.json } resource "aws_iam_role_policy" "eventbridge_datadog" { name = aws_iam_role.eventbridge_datadog.name role = aws_iam_role.eventbridge_datadog.id policy = data.aws_iam_policy_document.eventbridge_datadog.json } # あらかじめSecretsManagerにDATADOG_API_KEYを登録しておくこと data "aws_secretsmanager_secret_version" "datadog_api_key" { secret_id = "<DATADOG_API_KEYのシークレット名>" } # Connection resource "aws_cloudwatch_event_connection" "datadog" { name = "datadog-event-api" authorization_type = "API_KEY" auth_parameters { api_key { key = "DD-API-KEY" value = data.aws_secretsmanager_secret_version.datadog_api_key.secret_string } } } resource "aws_cloudwatch_event_api_destination" "datadog" { name = "datadog-event-api" connection_arn = aws_cloudwatch_event_connection.datadog.arn invocation_endpoint = "https://api.datadoghq.com/api/v1/events" http_method = "POST" invocation_rate_limit_per_second = 10 } EventBridgeのマネジメントコンソール左側のツリーから「 API の 送信先 」および「接続」を確認し、"datadog-event- api "のリソースが作成されていたらOKです。 オートスケールイベントを拾うためのEventルールを作成する ECSのオートスケールイベントは、一例としては以下のように、" aws .ecs"の"UpdateService"イベントを呼び元の"userIdentity"で絞ることによって判別することができます。" aws .application-autoscaling"のイベントもありますが、こちらで設定してもスケールの最大値に達した時しかトリガされないようです。ただでさえ動作確認が大変なところなので、気をつけましょう。 { " detail-type ": [ " AWS API Call via CloudTrail " ] , " source ": [ " aws.ecs " ] , " detail ": { " userIdentity ": { " invokedBy ": [ " ecs.application-autoscaling.amazonaws.com " ] } , " eventSource ": [ " ecs.amazonaws.com " ] , " eventName ": [ " UpdateService " ] } } ※今回やってはいけないイベントパターンの設定(スケール最大値に達した時しか発生しない) { " source ": [ " aws.application-autoscaling " ] " detail-type ": [ " Application Auto Scaling Scaling Activity State Change " ] } ターゲットには、先ほど作成した API destinationsを指定します。 また、拾ったイベントの json を API のリク エス トボディによしなに整形するため、 入力トランスフォーマ を以下のように設定します。 入力パスで使用したい変数をバインドし、入力テンプレートでリク エス トボディを作成しています。 ※ 入力パス { " newDesiredCount ": " $.detail.requestParameters.desiredCount ", " service ": " $.detail.requestParameters.service " } ※ 入力テンプレート { " title ": " ECS Cluster Auto Scaling ", " source_type_name ": " amazon ecs ", " alert_type ": " info ", " text ": " %%%`<service>` is auto scaled. \n New desiredCount: <newDesiredCount> %%% ", " tags ": [ " environment:development ", " source:amazon_ecs ", " aws_account:************ ", " new_desired_count:<newDesiredCount> ", " ecs_service_name:<service> " ] } これらの設定のサンプルコードです。 サンプルコード # ECSサービスのオートスケールを検知して通知するイベント resource "aws_cloudwatch_event_rule" "ecs_service_autoscale" { name = "ecs-service-autoscale" event_pattern = jsonencode ( { detail-type = [ "AWS API Call via CloudTrail" ] source = [ "aws.ecs" ] detail = { userIdentity = { invokedBy = [ "ecs.application-autoscaling.amazonaws.com" ] } eventSource = [ "ecs.amazonaws.com" ] , eventName = [ "UpdateService" ] } } ) } # API destinationsをターゲットに設定 resource "aws_cloudwatch_event_target" "ecs_service_autoscale" { rule = aws_cloudwatch_event_rule.ecs_service_autoscale.name arn = aws_cloudwatch_event_api_destination.datadog.arn role_arn = aws_iam_role.eventbridge_datadog.arn input_transformer { input_paths = { newDesiredCount = "$.detail.requestParameters.desiredCount" , service = "$.detail.requestParameters.service" } # jsonencodeが特殊文字をエスケープしてしまうので、ヒアドキュメントを使用する input_template = <<EOF { "title": "ECS Service Auto Scaling", "source_type_name": "amazon ecs", "alert_type": "info", "text": "%%%`<service>` is auto scaled.\n New desiredCount: <newDesiredCount> %%%", "tags": [ "environment:development", "source:amazon_ecs", "aws_account:**********", "event_name:ecs_service_autoscaled", "new_desired_count:<newDesiredCount>", "ecs_service_name:<service>" ] } EOF } } Eventルールが作成されたら、何らかの方法でECSサービスをオートスケールさせてみましょう。DatadogのEvent Explorer ( https :// .datadoghq.com/event/ explorer )にて以下のようなイベントが飛んできていることを確認します。"event_name:ecs_service_autoscaled "でクエリをすれば出てくるはずです。 ※ イベントがDatadogに飛んでこない時 イベントが飛ばない時は以下を行いましょう EventルールのターゲットにCloudWatchを設定 コンソールからEventルールを選択し、「ターゲット」のタブから「編集」でCloudWatchロググループを追加します。追加の権限などは不要なはずです。イベントが正しく拾えていれば、設定したロググループにログストリームが生成されているはずです。また、ロググループからイベントの詳細を確認できるので、入力テンプレートの情報を充実させたい場合などにもこちらを参照します。 Eventルールの API destinationsのターゲットにDLQを設定する コンソールからEventルールを選択し、「ターゲット」のタブから「編集」-> 「ターゲットを選択」のページに移動します。ターゲットは複数設定できますが、 API destinationsのものを選び、「追加設定」から以下のようにDLQを設定します。Datadogの API を読んだ時にエラーが返ってきていた場合は、設定したキューを確認することでレスポンスを確認できます。 Datadog Slack連携の設定 Slack通知を行うためには、あらかじめDatadogとSlack側で簡単な設定が必要です。本記事では設定方法の説明は割愛するので、公式ドキュメントを参考に設定を行ってください。 docs.datadoghq.com Event Monitorの設定 Datadogコンソールの左側のツリーから「Monitors」を選択し、遷移後画面の左上にある「+New Monitor」をクリックします。(もしくはブラウザに https :// .datadoghq.com/monitors/createを入力)作成画面では「Event」を選択します。 1. Define the search query を以下のように設定します クエリの欄に"event_name:ecs_service_autoscaled "を入力 "ecs_service_autoscaled"は入力テンプレートで独自定義したものなので、これで目的のイベントのみを拾います by句に"new_desired_count"と"ecs_service_name"を入力 サービス, スケールイベントごとに通知が飛ぶようにマルチモニター化します。また、ここでby句に指定したTagsのみが通知メッセージに埋め込み可能になります。 2. Set alert conditions を以下のように設定します Trigger when the evaluated value is " above or equal to "に設定 イベント発生を検知したいので データ点1 以上 でWarnアラートを飛ばせるように設定します Alert thresholdを適当な大きな値に設定 オートスケールイベントの通知はWarnレベルで行いたいので、Alertレベルの通知は行わないように、適当に大きな値にします。(本当はInfoレベルにしたいですが、2025/06現在ではDatadogの仕様上不可能と回答をサポートの方にいただきました) Warning thresholdを1に設定 イベントが起きる毎に通知を行いたいので、1に設定します 3. Configure notifications & automations をお好みで設定します 以下は設定例です。{{ }}で囲んである部分には、「EventのTagsに設定されている」かつ「by句で指定している」値のみ埋め込めます。通知をリッチにしたい場合は、 AWS のEventルール側で入力テンプレートの"tags"を充実させ、DatadogのEvent Monitor側のby句でも使用したいTagsを指定しましょう。また、メッセージ全体を{{#is_warning}}{{/is_warning}}で囲えば、Warn状態からの リカバリ 時の通知が飛ばなくなります。 以上で、オートスケールイベントが起きた時に、以下のようにSlackに通知が飛ぶようになりました。 おわりに とりあえずEventBridge -> Datadog -> Slackの通知ができそうだということで仕組みを作成してみましたが、思ったよりも通知がゴチャついてしまったなという印象があります。(通知タイトルに "on ecs_service_name:~"ってついてしまうなど)もしかしたら、以前紹介した Amazon Q Developer in chat applications(旧: AWS Chatbot)を使用した仕組み の方が、スマートに通知できたような気もしています。とはいえオートスケールイベントを通知するという目的は達成しており、監視・モニタリング系の管理をDatadogに集約することには運用上のメリットもあるので、そことの兼ね合いでもあると思います。ここら辺は今後のDatadogのアップデートにも期待ですね。 MNTSQ株式会社 SRE 西室
アバター