TECH PLAY

MNTSQ

MNTSQ の技術ブログ

91

はじめに MNTSQ はそのサービスの性質(「契約」の集約、一元管理、活用)上、セキュリティの維持と向上が至上命題です。 セキュリティへの取り組みには幾つかのアプローチがありますが、何が不足しているのか、どういった対処が必要かという点を突き止めるには情報が必要です。これはどういったアプローチを取るにしても共通して重要な観点と思います。 本稿はこの情報の獲得のためのログ収集範囲の拡充を行った記録となります。対象は Route 53 の DNS クエリログです。 なぜ DNS クエリログを取るか DNS クエリログはその名前の通り DNS へのクエリのログです。つまり いつ 誰が 何を どこから(「誰が」と同一の情報になる場合あり) が DNS クエリ単位で得られます。Route 53 で得られる DNS クエリログには以下2種類があります。 公開 DNS クエリログ: Public DNS query logging - Amazon Route 53 インターネットからの Route 53 公開 (public) hosted zone に対して発行された DNS クエリに関するログ リ ゾル バクエリログ: Resolver query logging - Amazon Route 53 VPC 内からインターネットに向けて発行された DNS クエリに関するログ VPC に紐付く Route 53 非公開 (private) hosted zone の名前解決は Route 53 リゾルバ が担い、ログ出力もこれが担う つまり Route 53 においては上述 DNS クエリログを インターネット → Route 53 公開 hosted zone(公開 DNS クエリログ) VPC → インターネット(リ ゾル バクエリログ) の2方向に関して収集することができます。これによって得られる情報はいくつか考えられますが、 インターネット → Route 53 公開 hosted zone 所謂 attack surface を狙われている形跡の確認 意図しないホストに対してのリク エス トが継続していないか等、接続エラーの確認 VPC → インターネット VPC 内から意図しない通信が発生していないかの確認 といったものがパッと思い付くだけでも挙げられます。実際にログを取ってみて初めて気付ける活用法もあるはずなので、まずはログを取ることを目的としてもよいでしょう。 DNS クエリログ収集構成 構成図を以下に示します。 AWSACC1 = Route 53 リソース稼動アカウント(図では1つだが実際には複数存在) Analysis = ログ分析用アカウント(1つのみ存在) Route 53 が生成する各 DNS クエリログを最終的には専用の AWS アカウント内に用意した S3 バケット に集約し、当該アカウントの Athena からログを解析する構成となります。 ログを必ずしも専用の AWS アカウントに集約する必要は無いのですが、今回は Athena でのログ検索時の利便性の面から、ログ集約先および活用場所をひとつの場所に絞るようにしました。S3 上にログを集約する取り組みが DNS クエリログについては初であった点も保存先選定の柔軟さに一役買っています。 図から判るとおり、 DNS クエリログによって S3 への保存方法が異なります。 リ ゾル バクエリログはログ出力先を複数選べ、選択肢の中には S3 がデフォルトで存在します *1 。 一方で公開 DNS クエリログについては CloudWatch Logs 以外にログを出力する選択肢はありません *2 。また CloudWatch Logs ロググループは us-east-1 にあるものだけが利用可 という制約もあります。 弊社でログ検索用に整備している Athena とその関連リソースは ap-northeast-1 にあることを前提にしているので、ここは出来れば ap-northeast-1 に寄せたいところです。このあたりを踏まえて公開 DNS クエリログについては Data Firehose を使い us-east-1 内で CloudWatch Logs から S3 へログを移設 S3 レプリケーション で us-east-1 から ap-northeast-1 へリージョンを跨いで最終目的地となる S3 バケット へログを保存 という構成をとるようにしました。 Terraform コード Route 53 ログを生成する側を submitter、ログを最終的に保管し Athena で検索する側を receiver とし、2つのコードを例示します。 前述の構成図でいえば AWSACC1 に相当するものが submitter、 Analysis に相当するものが receiver になります。 いずれも実際に使っているコードを改変しての例示となります。 submitter 以下を実施するコードです。Route 53 各ゾーン (private / public) および VPC は既に存在するものとします。 リ ゾル バクエリログを receiver 側 S3 バケット として保存 公開 DNS クエリログを us-east-1 の CloudWatch Logs ロググループに保存 us-east-1 にある CloudWatch Logs ロググループの内容を us-east-1 の S3 バケット へ Data Firehose を使い送出 後述の Athena でのログ解析の都合で dynamic partitioning ( https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html ) を有効にしています us-east-1 にある S3 バケット の内容を receiver 側の ap-northeast-1 下 S3 バケット へレプリケート main.tf data "aws_caller_identity" "current" {} # リゾルバクエリログ収集用コード resource "aws_route53_resolver_query_log_config" "main" { name = var.route53 [ "resolver_query_log" ][ "config_name" ] destination_arn = var.route53 [ "resolver_query_log" ][ "bucket_arn" ] } resource "aws_route53_resolver_query_log_config_association" "main" { resolver_query_log_config_id = aws_route53_resolver_query_log_config.main.id resource_id = var.vpc [ "id" ] } # 公開 DNS クエリログ収集用コード resource "aws_cloudwatch_log_group" "aws_route53_public" { provider = aws.us-east-1 name = var.route53 [ "public_dns_query_log" ][ "log_group_name" ] retention_in_days = 14 # S3 上のログを実運用上は使うので CloudWatch Logs には長期保管する必要がない } data "aws_iam_policy_document" "route53_query_logging" { statement { actions = [ "logs:CreateLogStream" , "logs:PutLogEvents" , ] resources = [ aws_cloudwatch_log_group.aws_route53_public.arn, ] principals { identifiers = [ "route53.amazonaws.com" ] type = "Service" } } } resource "aws_cloudwatch_log_resource_policy" "route53_public_query_logging_policy" { provider = aws.us-east-1 policy_document = data.aws_iam_policy_document.route53_query_logging.json policy_name = "$ { var.route53 [ "resolver_query_log" ][ "keyword" ]} -policy" } resource "aws_route53_query_log" "public" { provider = aws.us-east-1 depends_on = [ aws_cloudwatch_log_resource_policy.route53_public_query_logging_policy ] cloudwatch_log_group_arn = aws_cloudwatch_log_group.aws_route53_public.arn zone_id = aws_route53_zone.public.zone_id } # us-east-1 内で CloudWatch Logs から S3 へログを Data Firehose 経由で移す resource "aws_cloudwatch_log_subscription_filter" "s3_stream_filter" { provider = aws.us-east-1 name = "$ { var.route53 [ "public_dns_query_log" ][ "keyword" ]} -to-firehose" log_group_name = aws_cloudwatch_log_group.aws_route53_public.name # 全ログを転送対象にしたいので filter_pattern は空にする filter_pattern = "" destination_arn = aws_kinesis_firehose_delivery_stream.aws_route53_public.arn role_arn = aws_iam_role.route53_public_query_logs_to_firehose_role.arn } resource "aws_cloudwatch_log_group" "route53_public_firehose_log" { provider = aws.us-east-1 name = "/aws/kinesisfirehose/$ { aws_kinesis_firehose_delivery_stream.main.name } " retention_in_days = 14 # 最終保存先 S3 バケットにレプリケートされたログを実運用上では使うので長期間の保持は不要 } resource "aws_kinesis_firehose_delivery_stream" "main" { provider = aws.us-east-1 name = var.route53 [ "public_dns_query_log" ][ "keyword" ] destination = "extended_s3" extended_s3_configuration { role_arn = aws_iam_role.route53_public_query_logging_role.arn bucket_arn = aws_s3_bucket.aws_route53_public.arn buffering_size = 64 # MB 単位。dynamic partitioning が有効の場合必須 /* Data Firehose 内で Route 53 公開 DNS ログを Athena が解釈できる形式に変換する。詳細は後述 実施している内容は以下のとおり - CloudWatch Logs から Data Firehose に流れてくるログは gzip 圧縮されているのでこれを展開 - 展開した内容は1行に複数の JSON オブジェクトが含まれる形式になっているので jq を使い1行1オブジェクトになるよう展開 - S3 レプリケーションの事情で Data Firehose から S3 へログデータを置く場合は適当な prefix が欲しいので、これを "logs/" とできるよう設定 */ dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{prefix: {dummy: (\"logs\")} | .dummy}" # 実質的に "logs" という固定文字列を返すだけ } } processors { type = "Decompression" parameters { parameter_name = "CompressionFormat" parameter_value = "GZIP" } } processors { type = "AppendDelimiterToRecord" } } prefix = "!{partitionKeyFromQuery:prefix}/$ { data.aws_caller_identity.current.account_id } /!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/" error_output_prefix = "error/$ { data.aws_caller_identity.current.account_id } /!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/!{firehose:error-output-type}/" compression_format = "GZIP" cloudwatch_logging_options { enabled = true log_group_name = aws_cloudwatch_log_group.route53_public_firehose_log.name log_stream_name = "S3Delivery" } } } resource "aws_s3_bucket" "aws_route53_public" { provider = aws.us-east-1 bucket = var.route53 [ "public_dns_query_log" ][ "source_bucket_name" ] } resource "aws_s3_bucket_lifecycle_configuration" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id rule { status = "Enabled" id = "delete after 180 days" expiration { days = 180 # 集約先 S3 バケット側のログを使うので然程長期間保持しておく必要はない } filter { prefix = "" } } } resource "aws_s3_bucket_versioning" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_server_side_encryption_configuration" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } data "aws_iam_policy_document" "aws_route53_public_bucket_policy" { provider = aws.us-east-1 statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.aws_route53_public.arn } /*" ] principals { type = "Service" identifiers = [ "logging.s3.amazonaws.com" ] } condition { test = "ArnLike" variable = "aws:SourceArn" values = [ "arn:aws:s3:::mntsq-$ { var.env } -*" ] } condition { test = "StringEquals" variable = "aws:SourceAccount" values = [ data.aws_caller_identity.current.account_id ] } } } resource "aws_s3_bucket_policy" "aws_route53_public" { provider = aws.us-east-1 bucket = aws_s3_bucket.aws_route53_public.bucket policy = data.aws_iam_policy_document.aws_route53_public_bucket_policy.json } resource "aws_s3_bucket_replication_configuration" "route53_public_query_logging" { provider = aws.us-east-1 depends_on = [ aws_s3_bucket_versioning.aws_route53_public ] bucket = aws_s3_bucket.aws_route53_public.id role = aws_iam_role.route53_public_query_logging_replication.arn rule { id = "route53-public-dns-query-log-replication" status = "Enabled" filter { prefix = "logs" } delete_marker_replication { status = "Disabled" } destination { account = var.route53 [ "public_dns_query_log" ][ "destination_account_id" ] bucket = var.route53 [ "resolver_query_log" ][ "destination_bucket_arn" ] storage_class = "STANDARD_IA" access_control_translation { owner = "Destination" } } } } provider.tf provider "aws" { region = "ap-northeast-1" } provider "aws" { alias = "us-east-1" region = "us-east-1" } terraform { required_version = "~> 1.11.4" required_providers { aws = { source = "hashicorp/aws" version = "~> 6.0.0" } } } receiver submitter が生成した Route 53 ログを最終的に保管する S3 バケット を管理します。こちらはリージョンを跨がず ap-northeast-1 のみで完結するので、provider.tf の例示は省略します main.tf /* DNS クエリログを収集する対象となる AWS アカウントは AWS Organizations で管理している これらアカウントに対してのアクセス許可(S3 バケットポリシ)を個々設定するのは手間なので、organization 単位で許可できるようにする これには organization ID が要り、その値を得るための data */ data "aws_organizations_organization" "main" {} # リゾルバクエリログの最終保管場所となる S3 バケットとその周辺のリソースを定義 resource "aws_s3_bucket" "route53_resolver_query_logs" { bucket = var.s3 [ "resolver_query_logs" ][ "name" ] } resource "aws_s3_bucket_server_side_encryption_configuration" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } resource "aws_s3_bucket_lifecycle_configuration" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id rule { id = "transition to archives" transition { days = 30 storage_class = "STANDARD_IA" } transition { days = 60 storage_class = "GLACIER" } filter { prefix = "" } status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } data "aws_iam_policy_document" "route53_resolver_query_logs" { statement { effect = "Allow" actions = [ "s3:GetBucketAcl" ] resources = [ aws_s3_bucket.route53_resolver_query_logs.arn, ] principals { type = "Service" identifiers = [ "delivery.logs.amazonaws.com" ] } } statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.route53_resolver_query_logs.arn } /*" , ] principals { type = "Service" identifiers = [ "delivery.logs.amazonaws.com" ] } condition { test = "StringEquals" variable = "s3:x-amz-acl" values = [ "bucket-owner-full-control" , ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } } resource "aws_s3_bucket_policy" "route53_resolver_query_logs" { bucket = aws_s3_bucket.route53_resolver_query_logs.id policy = data.aws_iam_policy_document.route53_resolver_query_logs.json } # 公開 DNS クエリログの最終保管場所となる S3 バケットとその周辺のリソースを定義 resource "aws_s3_bucket" "route53_public_dns_query_logging" { bucket = var.s3 [ "public_dns_query_logs" ][ "name" ] } resource "aws_s3_bucket_versioning" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_public_access_block" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_ownership_controls" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id rule { object_ownership = "BucketOwnerPreferred" } } resource "aws_s3_bucket_server_side_encryption_configuration" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id rule { apply_server_side_encryption_by_default { sse_algorithm = "AES256" } } } data "aws_iam_policy_document" "route53_public_dns_query_logging" { statement { effect = "Allow" actions = [ "s3:ReplicateObject" , "s3:ReplicateDelete" , "s3:ReplicateTags" , "s3:GetObjectVersionTagging" , "s3:ObjectOwnerOverrideToBucketOwner" , ] resources = [ "$ { aws_s3_bucket.route53_public_dns_query_logging.arn } /*" ] principals { type = "AWS" identifiers = [ "*" ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } statement { effect = "Allow" actions = [ "s3:GetBucketVersioning" , "s3:PutBucketVersioning" , "s3:ListBucket" , "s3:GetReplicationConfiguration" , ] resources = [ aws_s3_bucket.route53_public_dns_query_logging.arn ] principals { type = "AWS" identifiers = [ "*" ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } statement { effect = "Allow" actions = [ "s3:PutObject" ] resources = [ "$ { aws_s3_bucket.route53_public_dns_query_logging.arn } /*" , ] principals { type = "Service" identifiers = [ "logging.s3.amazonaws.com" ] } condition { test = "StringEquals" variable = "s3:x-amz-acl" values = [ "bucket-owner-full-control" ] } condition { test = "StringEquals" variable = "aws:PrincipalOrgID" values = [ data.aws_organizations_organization.main.id ] } } } resource "aws_s3_bucket_policy" "route53_public_dns_query_logging" { bucket = aws_s3_bucket.route53_public_dns_query_logging.id policy = data.aws_iam_policy_document.route53_public_dns_query_logging.json } 公開 DNS クエリログの取り扱いについての注意 上記サンプルコード内で Data Firehose を使い CloudWatch Logs から S3 へ公開 DNS クエリログを送出する過程で、何やら小難しいことをしている箇所に目が付くと思います。 extended_s3_configuration { role_arn = aws_iam_role.route53_public_query_logging_role.arn bucket_arn = aws_s3_bucket.aws_route53_public.arn buffering_size = 64 # MB 単位。dynamic partitioning が有効の場合必須 /* Data Firehose 内で Route 53 公開 DNS ログを Athena が解釈できる形式に変換する。詳細は後述 実施している内容は以下のとおり - CloudWatch Logs から Data Firehose に流れてくるログは gzip 圧縮されているのでこれを展開 - 展開した内容は1行に複数の JSON オブジェクトが含まれる形式になっているので jq を使い1行1オブジェクトになるよう展開 - S3 レプリケーションの事情で Data Firehose から S3 へログデータを置く場合は適当な prefix が欲しいので、これを "logs/" とできるよう設定 */ dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "MetadataExtraction" parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{prefix: {dummy: (\"logs\")} | .dummy}" # 実質的に "logs" という固定文字列を返すだけ } } processors { type = "Decompression" parameters { parameter_name = "CompressionFormat" parameter_value = "GZIP" } } processors { type = "AppendDelimiterToRecord" } } これは Athena でログを処理することを前提とした前処理を Data Firehose のみで(= ログ処理用の Lambda 関数等を噛ませないで)実施する為の措置です。 通常 CloudWatch Logs にある Route 53 公開 DNS クエリログを Data Firehose でシンプルに S3 へ送出すると以下のような改行なしで複数の JSON オブジェクトが1行に集約されたものが得られます(実際のログを適当にマスクし例示します)。 { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/YVR52-R2 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522674731119363142938582278340608 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP YVR52-R2 192.0.2.143 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522685262133562904066894168588288 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO9-SN1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522690947843897953596035415605248 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com AAAA NOERROR UDP SFO9-SN1 192.0.2.144 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SEA900-R3 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522692678857209236868848315727872 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP SEA900-R3 2001:DB8::143 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522703737195603523266279501332480 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/ATL58-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231824257833115761174552619646976 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP ATL58-R1 192.0.2.148 192.0.2.0/24 " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/NRT8-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231840537028970579482687526338560 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP NRT8-SO1 192.0.2.10 - " }]}{ " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/KUL51-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047082905970302202038872078382990508713243113432088576 "," timestamp ": 17509 31754000,"message":"1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24"}]} ところがこの形式の JSON ログは Athena では受け付けられません。Athena は1行1エントリの JSON ログを要求するためです *3 。つまり上記例でいえば { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/YVR52-R2 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522674731119363142938582278340608 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP YVR52-R2 192.0.2.143 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522685262133562904066894168588288 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO9-SN1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522690947843897953596035415605248 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com AAAA NOERROR UDP SFO9-SN1 192.0.2.144 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SEA900-R3 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522692678857209236868848315727872 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP SEA900-R3 2001:DB8::143 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/SFO6-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047076594859411017872522703737195603523266279501332480 "," timestamp ": 1750931471000 ," message ":" 1.0 2025-06-26T09:51:11Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP SFO6-SO1 192.0.2.144 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/ATL58-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231824257833115761174552619646976 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP ATL58-R1 192.0.2.148 192.0.2.0/24 " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/NRT8-SO1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047077085475805385546231840537028970579482687526338560 "," timestamp ": 1750931493000 ," message ":" 1.0 2025-06-26T09:51:33Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NXDOMAIN UDP NRT8-SO1 192.0.2.10 - " }] } { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/KUL51-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047082905970302202038872078382990508713243113432088576 "," timestamp ": 1750931754000 ," message ":" 1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24 " }]} のような JSON ログとなっている必要があります。よって Data Firehose から S3 へログを流す際にその中身を書き換える必要が出てきます。Data Firehose ではデータ処理の過程で Lambda 関数にその役目を担わせることができるので *4 それを使うのも手ですが、お世話が必要な主体をあまり増やしたくありません。 同じような事例が無いか調査していたところ medium.com が基本的な構想として多いに参考になり、また S3 による レプリケーション を考える場合の prefix 付与においては dev.classmethod.jp が大変参考になりました。つまりはコード中のコメントにもあるとおり CloudWatch Logs から Data Firehose に流れてくるログは gzip 圧縮されているのでこれを展開 展開した内容は1行に複数の JSON オブジェクトが含まれる形式になっているので jq を使い1行1オブジェクトになるよう展開 S3 レプリケーション の事情で Data Firehose から S3 へログデータを置く場合は適当な prefix が欲しいので、これを "logs/" とできるよう設定 を Data Firehose のみで実施することが出来、これは dynamic partitioning *5 によって達成が可能ということになります。 本来 dynamic partitioning はログに含まれるキー値から S3 へオブジェクトを保存する際の prefix を決定し Athena をはじめとする S3 をデータソースとする解析系サービス向けに パーティション を整備するための機能ですが、弊社のケースではそこまで凝ったことは不要で、 JSON ログをその構造を維持しつつログエントリ単位で適当に改行したいという共有が満たせれば OK です。先に示したコードも processing_configuration ブロックが割合シンプルなものになっています。 このコードは前述2記事に拠るところが多大に有ります。この場を借りて感謝申し挙げます。 ログを Athena で検索する さて前項までに Route 53 由来の DNS クエリログを S3 に集約して保存できるようになりました。これを Athena で検索してゆくようにする手筈を整えます。 リ ゾル バクエリログについては Use partition projection - Amazon Athena で示される内容が充分実用に耐えるものになりますが、公開 DNS クエリログについては AWS としての公式サポートが CloudWatch Logs であるということを踏まえても想像に難くなく、このようなクエリ例が存在しません。従って自前で頑張る必要があります。 早い話以下のような内容が使えます。前述の Data Firehose コードによって S3 へログが送出される前提の内容です。例示値や置換すべき値は前述のリ ゾル バクエリ用の例に倣っています。 CREATE EXTERNAL TABLE r53_public_dns_logs ( messageType string, owner string, logGroup string, logStream string, subscriptionFilters array< string >, logEvents array< struct< id: string, timestamp : bigint, message: string > > ) PARTITIONED BY ( `datehour` string ) ROW FORMAT SERDE ' org.openx.data.jsonserde.JsonSerDe ' STORED AS INPUTFORMAT ' org.apache.hadoop.mapred.TextInputFormat ' OUTPUTFORMAT ' org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat ' LOCATION ' s3://amzn-s3-demo-bucket/route53-public-dns-query-logging/logs/aws_account_id/ ' TBLPROPERTIES( ' projection.enabled ' = ' true ' , ' projection.datehour.type ' = ' date ' , ' projection.datehour.range ' = ' 1970/01/01/00,NOW ' , ' projection.datehour.format ' = ' yyyy/MM/dd/HH ' , ' projection.datehour.interval ' = ' 1 ' , ' projection.datehour.interval.unit ' = ' HOURS ' , ' storage.location.template ' = ' s3://amzn-s3-demo-bucket/route53-public-dns-query-logging/logs/aws_account_id/$${datehour}/ ' ) 確かにこれで公開 DNS クエリログを検索できるのですが、クエリの内容をみてもわかるとおり、ログの中身で最も知りたい筈の DNS クエリ周辺の状況 ( logEvents[].message ) が string として扱われるに留まっており、少々厄介です。例えば { " messageType ":" DATA_MESSAGE "," owner ":" 123456789012 "," logGroup ":" LOG_GROUP "," logStream ":" ZxxxxxxxxxxxxxxxxxxxQ/KUL51-R1 "," subscriptionFilters ": [ " SUBSCRIPTION_FILTER " ] ," logEvents ": [{ " id ":" 39047082905970302202038872078382990508713243113432088576 "," timestamp ": 1750931754000 ," message ":" 1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24 " }]} というログが有ったとして、この中で真に知りたいのは 1.0 2025-06-26T09:55:54Z ZxxxxxxxxxxxxxxxxxxxQ sample.example.com A NOERROR UDP KUL51-R1 192.0.2.152 192.0.2.0/24 であって、これを適当な列に分割したうえで列に対して具体的な値やパターン等を当て嵌めて検索するということが本来やりたいことです。とれる手段は Data Firehose でのデータ処理時に Lambda 関数を噛ませて JSON ログ中 logEvents[].message だけを S3 へ送出する対象とする ログは加工せず、Athena で頑張る というものが考えられそうですが、弊社のケースでは先述の通り「お世話が必要な主体をあまり増やしたくない」ので、Athena で頑張る方法を選びました。具体的には ログデータを直接扱い Athena 上で取り回しのしやすい構造にする為のテーブル 上述の Athena テーブル定義による ログデータから DNS クエリログに関する内容(= logEvents[].message )だけを検索対象とするビュー 後述 といったようにテーブル以外に Athena ビュー *6 を用意することで対処しています。具体的には以下のようなビュー定義を使用しています。 -- Route 53 公開 DNS クエリログを Athena で扱うためのビュー -- r53_public_dns_logs というテーブルを元ネタとして DNS クエリログを直接 Athena で検索できるようにするためのもの CREATE OR REPLACE VIEW r53_public_dns_log_view AS SELECT -- 正規表現を使い、message フィールドを仮想的な列に分割 -- 正規表現の各( )がキャプチャグループ(1から始まるインデックス)に対応 regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 1 ) AS version, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 2 ) AS timestamp , regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 3 ) AS hosted_zone_id, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 4 ) AS name, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 5 ) AS type , regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 6 ) AS response_code, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 7 ) AS protocol, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 8 ) AS edge_location, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 9 ) AS r_ip, regexp_extract(e.message, ' ^([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) (.*)$ ' , 10 ) AS c_ip, l.datehour FROM -- 対応する Athena テーブル名を指定する -- ログ内容は datehour によってパーティションが切られているのでビューでもこれを使えるようにする r53_public_dns_logs l CROSS JOIN UNNEST(l.logEvents) AS t(e) -- 元のログにおける logEvents 配列を展開 ここで作成したビューを対象として検索を実施することで、公開 DNS クエリログもリ ゾル バクエリログと同等の使い勝手で Athena にて取り回すことが可能になります。 おわりに Route 53 由来の DNS クエリログを Athena で取り扱う方法について解説しました。 S3 へのログ保存が公式にサポートされているリ ゾル バクエリログでは Athena による検索およびその運用に関する tips が数多く見付かる一方、公開 DNS クエリログについては CloudWatch Logs 以外の場所での保管を自前で頑張らないといけない事情で、ログ検索それ自体の tips は然程多くない現状があります。 両方の DNS クエリログを同等の手段(本記事では Athena + S3 ベースで)で横断して追跡できるようにすることで、ログ利用の手間感の低減や新たな洞察を得ることの切っ掛けになるはずです。実際に弊社ではこの手法で DNS クエリログを割合気軽に追えるようになったことで、これまであまりケアできていなかった DNS 関連の運用改善や外部からのリク エス ト調査に新たな観点を導入するといった効果が得られ、予想よりも多くの嬉しさがありました。 DNS クエリログ収集やその運用改善に本稿が一助となれば幸いです。 MNTSQ 株式会社 SRE 秋本 *1 : AWS resources that you can send Resolver query logs to - Amazon Route 53 *2 : Public DNS query logging - Amazon Route 53 *3 : https://repost.aws/ja/knowledge-center/error-json-athena *4 : https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html *5 : https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html *6 : https://docs.aws.amazon.com/athena/latest/ug/views.html
アバター
こんにちは、MNTSQ( モンテスキュー )で アルゴリズム エンジニアをしている清水です。 MNTSQは契約書を解析・管理・検索するプロダクトを提供しています。これらのプロダクトには大規模 言語モデル (以下LLM)が搭載された機能が実装されています。また、LLMを活用した新プロダクトも鋭意開発中です。 LLMをアプリケーションに組み込む際の大きな課題の一つとして、 「LLMの出力形式(型)を如何に矯正するか?」 が挙げられます。単純なチャットアプリケーションであればそこまで問題にはなりませんが、LLMによる生成結果を後続のプログラムで処理する必要がある場合、事前に定義された型に従って出力を生成する必要があります。 現在、複数のLLMサービスで出力形式を制御する機能が搭載されていますが、本記事では Google が提供している Gemini の Structured output を取り上げます。本記事では、 開発の過程で得られた、GeminiのStructured outputにおける7つのTips を紹介したいと思います。 サンプルコード 例として以下のように Python の Google Gen AI SDK ( google -genai)を使用することを想定しています。 google -genaiでは types.GenerateContentConfig の response_schema に Pydantic のモデルを渡すことで、Structured outputを使用することができます。 本記事ではStructured outputの機能にフォーカスするのでプロンプトは最低限の内容にしています。また、 スキーマ として指定するPydanticモデルも、タイトルの抽出と契約書かどうかを判定するだけのシンプルなものにしています。また、Gemini API ではなく Vertex AI の API を介してGeminiを使用します。(ほとんどのケースでGemini API に対しても同じTipsが適用できると思いますが、一部仕様が異なる可能性があります。) from google.genai import Client, types from pydantic import BaseModel PROMPT_TEMPLATE = """ \ JSONスキーマに従って、ドキュメントの内容を分析してください。 <json_schema> {json_schema} </json_schema> <document> {document_text} </document> """ class ContractAnalysisResult (BaseModel): document_name: str is_contract: bool def analyze_contract (document_text: str ) -> ContractAnalysisResult: client = Client(vertexai= True , project= "development" , location= "global" ) prompt = PROMPT_TEMPLATE.format( json_schema=ContractAnalysisResult.model_json_schema(), document_text=document_text, ) contract_analysis_result = client.models.generate_content( model= "gemini-2.5-flash" , contents=prompt, # response_schemaにPydanticモデルを渡す config=types.GenerateContentConfig( response_schema=ContractAnalysisResult, ), ) return contract_analysis_result Tips 1: プロンプトに JSON スキーマ を含めない 一番簡単に試すことができるテクニックは、「プロンプトに JSON スキーマ を含めない」です。実は、 response_schema を設定した場合は、 JSON スキーマ をプロンプトに含めない ことが 公式のドキュメント で推奨されています。 警告:   responseSchema  を構成する場合は、テキスト プロンプトで スキーマ を指定しないでください。これにより、予期しない結果や品質の低い結果が生じる可能性があります。 以下のサンプルコードでは、上記のサンプルコードから JSON スキーマ を埋め込んでいた箇所を削除しています。 # response_schemaを指定する際には、JSONスキーマをプロンプトに含めない PROMPT_TEMPLATE = """ \ JSONスキーマに従って、ドキュメントの内容を分析してください。 <document> {document_text} </document> """ Structured outputについてすべての仕組みが詳細に明かされているわけではないので、なぜ JSON スキーマ をプロンプトに含めないことが推奨されるのか技術的な理由は分かりません。公式のドキュメントで”don't duplicate the schema in the text prompt.”と書いてあることから、重複した情報をLLMに与えることが悪影響を及ぼすのかもしれません。 また、OpenAIやAnthropicのドキュメントには同様の記述は見当たらず(見逃していたらすみません)、Gemini特有の性質である可能性もあります。 Tips 2: title と description を設定する JSON スキーマ の各フィールドにおいて、 自然言語 による説明を付けたい時は以下のように title や description フィールドを使いましょう 。Structured outputでは JSON スキーマ による構造化されたデータしか渡せないと勘違いされがちですが、LLMらしく 自然言語 による情報も与えることができます。 from pydantic import BaseModel, Field class ContractAnalysisResult (BaseModel): document_name: str = Field( title= "タイトル (Title)" , description= "ドキュメント冒頭に記載されているタイトル" , ) is_contract: bool = Field( title= "契約書かどうか (Is Contract)" , description= "ドキュメントが契約書かどうかを示す。就業規則や賃金規定などは契約書ではない。" , ) ここで定義された title と description は Vertex AIのAPIリファレンス で記述されている responseSchema フィールドの title フィールドと description フィールドに渡されます。(詳しくは次の項目で言及します) Tips 3: API に渡されるパラメータを確認する Pydanticモデルを response_schema に渡すだけでStructured outputを使用できますが、 最終的にどのような形式で API に渡されるのかを確認する ことは有効です。以下のようにして、 response_schema に渡したPydanticモデルが、どのように API に渡すためのパラメータに変換されるかを確認することができます。 from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, Field class ContractAnalysisResult (BaseModel): document_name: str = Field( title= "タイトル (Title)" , description= "ドキュメント冒頭に記載されているタイトル" , ) is_contract: bool = Field( title= "契約書かどうか (Is Contract)" , description= "ドキュメントが契約書かどうか。就業規則や賃金規定などは契約書ではない。" , ) if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) # _transformers.t_schemaにClientオブジェクトとPydanticモデルを渡す request_params = t.t_schema(client, ContractAnalysisResult) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " document_name ": { " description ": " ドキュメント冒頭に記載されているタイトル ", " title ": " タイトル (Title) ", " type ": " STRING " } , " is_contract ": { " description ": " ドキュメントが契約書かどうか。就業規則や賃金規定などは契約書ではない。 ", " title ": " 契約書かどうか (Is Contract) ", " type ": " BOOLEAN " } } , " property_ordering ": [ " document_name ", " is_contract " ] , " required ": [ " document_name ", " is_contract " ] , " title ": " ContractAnalysisResult ", " type ": " OBJECT " } 例えば、私は開発の過程で以下のような google -genaiの仕様(というよりはバグ?)を見つけました 1 。 下記のように、Pydanticモデルが 入れ子 構造になっている スキーマ において、以下のように親モデルの Field において title と description を設定します。 from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, Field class ContractTerm (BaseModel): effective_date: str expiration_date: str class ContractAnalysisResult (BaseModel): contract_term: ContractTerm = Field( title= "契約期間 (Contract Term)" , description= "契約有効日と失効日から構成される契約の期間。" , ) if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, ContractAnalysisResult) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) この スキーマ の t_schema の出力を確認してみると、以下のように title と description が消えてしまっていることが確認できます。( title はデフォルト値のクラス名( ContractTerm )が代わりに格納されています。) { " properties ": { " contract_term ": { " properties ": { " effective_date ": { " title ": " Effective Date ", " type ": " STRING " } , " expiration_date ": { " title ": " Expiration Date ", " type ": " STRING " } } , " property_ordering ": [ " effective_date ", " expiration_date " ] , " required ": [ " effective_date ", " expiration_date " ] , " title ": " ContractTerm ", " type ": " OBJECT " } } , " required ": [ " contract_term " ] , " title ": " ContractAnalysisResult ", " type ": " OBJECT " } 以下のように親モデルの Field ではなく子モデルの ConfigDict で title を、docstringで description を設定すると、問題なく変換されます。 from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, ConfigDict class ContractTerm (BaseModel): """契約有効日と失効日から構成される契約の期間。""" # docstringを設定するとdescriptionとして認識される model_config = ConfigDict(title= "契約期間 (Contract Term)" ) effective_date: str expiration_date: str class ContractAnalysisResult (BaseModel): contract_term: ContractTerm if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, ContractAnalysisResult) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " contract_term ": { " description ": " 契約有効日と失効日から構成される契約の期間。 ", " properties ": { " effective_date ": { " title ": " Effective Date ", " type ": " STRING " } , " expiration_date ": { " title ": " Expiration Date ", " type ": " STRING " } } , " property_ordering ": [ " effective_date ", " expiration_date " ] , " required ": [ " effective_date ", " expiration_date " ] , " title ": " 契約期間 (Contract Term) ", " type ": " OBJECT " } } , " required ": [ " contract_term " ] , " title ": " ContractAnalysisResult ", " type ": " OBJECT " } 思ったように型の矯正が効かないときはこのようなエッジケースを踏んでいるのかもしれません。そのような時は、この方法を使って API に渡されるパラメータを確認すると良いでしょう。 Tips 4: date 型や datetime 型を使用する スキーマ を定義するPydanticモデルの各フィールドおいて、 Python の date 型や datetime 型を使用する ことができます。以下のPydanticモデルを t_schema に渡すと以下のようなパラメータに変換されていることが確認できます。 from datetime import date from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel class ContractTerm (BaseModel): effective_date: date # str型ではなくdate型を指定 expiration_date: date if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, ContractTerm) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果( "format": "date", となっている箇所に注目してください) { " properties ": { " effective_date ": { " format ": " date ", " title ": " Effective Date ", " type ": " STRING " } , " expiration_date ": { " format ": " date ", " title ": " Expiration Date ", " type ": " STRING " } } , " property_ordering ": [ " effective_date ", " expiration_date " ] , " required ": [ " effective_date ", " expiration_date " ] , " title ": " ContractTerm ", " type ": " OBJECT " } この format フィールドは title や description と同様に API の responseSchema でサポートされているフィールドです。ただし、どのようなformatでも良いわけではなく現状は date 、 date-time 、 time 、 duration のみが サポートされているようです 。それぞれ Python のdatetimeライブラリの date クラス、 datetime クラス、 time クラス、 timedelta クラスが対応しています 2 。 from datetime import date, datetime, time, timedelta from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel class DateTimeClasses (BaseModel): date_field: date datetime_field: datetime time_field: time duration_field: timedelta if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, DateTimeClasses) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " date_field ": { " format ": " date ", " title ": " Date Field ", " type ": " STRING " } , " datetime_field ": { " format ": " date-time ", " title ": " Datetime Field ", " type ": " STRING " } , " time_field ": { " format ": " time ", " title ": " Time Field ", " type ": " STRING " } , " duration_field ": { " format ": " duration ", " title ": " Duration Field ", " type ": " STRING " } } , ... } Tips 5: その他サポートされている API のフィールドを使用する responseSchema がサポートしているフィールドは、上記で紹介した title 、 description 、 format フィールド以外にもあります。詳しくは 公式ドキュメント をご参照ください。これらのフィールドはPydanticで以下のように表現できます。 from datetime import date, datetime, time, timedelta from enum import Enum from google.genai import Client from google.genai import _transformers as t from pydantic import BaseModel, Field class EnumClass (Enum): A = "a" B = "b" C = "c" class Schema (BaseModel): number_field: int = Field(ge= 1 , le= 10 ) string_field: str = Field(min_length= 1 , max_length= 10 ) list_field: list [ int ] = Field(min_items= 1 , max_items= 10 ) with_pattern_field: str = Field(pattern= r"^[a-z]+$" ) # examplesを渡すとエラーになるので注意 with_example_field: str = Field(json_schema_extra={ "example" : "example string" }) nullable_field: str | None = Field(default= None ) any_of_field: str | int enum_field: EnumClass if __name__ == "__main__" : client = Client(vertexai= True , project= "development" , location= "global" ) request_params = t.t_schema(client, Schema) print (request_params.model_dump_json(indent= 2 , exclude_none= True )) 出力結果 { " properties ": { " number_field ": { " maximum ": 10.0 , " minimum ": 1.0 , " title ": " Number Field ", " type ": " INTEGER " } , " string_field ": { " max_length ": 10 , " min_length ": 1 , " title ": " String Field ", " type ": " STRING " } , " list_field ": { " items ": { " type ": " INTEGER " } , " max_items ": 10 , " min_items ": 1 , " title ": " List Field ", " type ": " ARRAY " } , " with_pattern_field ": { " pattern ": " ^[a-z]+$ ", " title ": " With Pattern Field ", " type ": " STRING " } , " with_example_field ": { " example ": " example string ", " title ": " With Example Field ", " type ": " STRING " } , " nullable_field ": { " nullable ": true , " title ": " Nullable Field ", " type ": " STRING " } , " any_of_field ": { " any_of ": [ { " type ": " STRING " } , { " type ": " INTEGER " } ] , " title ": " Any Of Field " } , " enum_field ": { " enum ": [ " a ", " b ", " c " ] , " title ": " EnumClass ", " type ": " STRING " } } , " property_ordering ": [ " number_field ", " string_field ", " list_field ", " with_pattern_field ", " with_example_field ", " nullable_field ", " any_of_field ", " enum_field " ] , " required ": [ " number_field ", " string_field ", " list_field ", " with_pattern_field ", " with_example_field ", " any_of_field ", " enum_field " ] , " title ": " Schema ", " type ": " OBJECT " } Tips 6: エラー回避のためのvalidatorを実装する 上記で紹介した date 型のフィールドや max_ength などはPydanticモデルの制約として働きます。例えば、 date 型のフィールドに無効な日付の文字列が代入されるとエラーになります。また、 max_length=10 と指定されているフィールドに11文字以上の文字列が渡されると同じくエラーになります。 この時、 Geminiがこれらの制約に違反した JSON を生成する可能性がある ことに注意が必要です。一定の矯正力はありますが、100%制約を守ってくれるわけではありません 3 。制約に違反したテキストが生成されたときにエラーにならないように、 生成されたテキストを加工するvalidatorを実装しておく と安全でしょう。 例えば私は、 date 型のフィールドに対して 0000-01-01 のような無効な日付をGeminiが生成するケースを観測したことがあります。この場合、以下のようなvalidatorを実装してエラーを回避すると良いでしょう。 import logging from datetime import date from typing import Any from pydantic import BaseModel, Field, ModelWrapValidatorHandler, ValidationError, field_validator class EffectiveDate (BaseModel): effective_date: date | None = Field(default= None ) @ field_validator ( "effective_date" , mode= "wrap" ) def date_parsing_validator (value: Any, handler: ModelWrapValidatorHandler[Any]) -> Any: """0000-01-01のような無効な日付をNoneに変換する""" try : return handler(value) except ValidationError as e: if "date_parsing" in (error[ "type" ] for error in e.errors()): logging.warning(f "Invalid date: {value}" ) return None else : raise e if __name__ == "__main__" : # Geminiが0000-01-01のような無効な日付を生成したと想定 effective_date = EffectiveDate.model_validate_json( '{"effective_date": "0000-01-01"}' ) print (effective_date) # WARNING:root:Invalid date: 0000-01-01 # effective_date=None Tips 7: Chain of Thoughtを意識する Chain of Thought (CoT)とは、結論だけでなく推論の過程も生成させることでLLMの性能を向上させる手法のことです。通常はプロンプトを工夫したり、専用にチューニングされたモデルを使用することでCoTを実現するのですが、 response_schema を工夫することで擬似的なCoTを実現することができます 。 例として、以下のようなPydanticモデルを定義します。 from datetime import date from pydantic import BaseModel, Field class ContractTerm (BaseModel): effective_date: date | None = Field(description= "契約期間の有効日" ) period: int | None = Field(description= "契約が持続する期間" ) expiration_date: date | None = Field(description= "契約期間の失効日" ) 欲しいのは effective_date と expiration_date だけですが、同時に period も抽出するようにしています。このようにすることで、例えば「本契約は2025年1月1日から3年間有効とする」のように契約失効日が直接的に書かれていない場合でも、事前に抽出した effective_date と period から expiration_date を計算してくれる効果が期待できます。 このように、フィールドを定義する順番を工夫したり、関連する情報を抽出するように促すことで、擬似的なCoTが期待できるでしょう。 注意事項 本記事で紹介した内容は、2025年6月時点のGemini/Vertex AIの仕様と、 google -genaiのバージョン1.19.0の仕様に基づいています。今後のアップデートによってGeminiや SDK の仕様が変更される可能性があります。実際に利用される際は、必ず公式のドキュメントをご確認いただくようお願いします。 まとめ 本記事では、GeminiのStructured outputでレスポンスの型を矯正するためのTipsをいくつか紹介しました。開発で得られた知見を全て盛り込んだら想定よりも多い文字数になってしまいました。是非開発のヒントにしていただけたら幸いです。 冒頭でも触れましたが、MNTSQではLLMを活用したプロダクトを鋭意開発中です。もしMNTSQの仕事にご興味を持っていただけたら、 ぜひお気軽にカジュアル面談でお話ししましょう! careers.mntsq.co.jp note.mntsq.co.jp tech.mntsq.co.jp この記事を書いた人 清水健 吾 MNTSQ アルゴリズム エンジニア LLMのご機嫌と格闘する日々です。 google -genaiバージョン1.19.0時点での動作です。バグであれば今後解消されるかもしれません。 ↩ 他にも Pendulum も対応しています。 ↩ どの程度の矯正力を持つかはフィールドによって異なるようです。例えば私の場合 enum フィールドに違反したケースに遭遇したことはありません。反対に min_length , max_length は矯正力が弱く、validatorの実装は必須だと思われます。 ↩
アバター
openapi-ts 導入 こんにちは、MNTSQ のソフトウェアエンジニアの森山です。今回は、 REST API の OpenAPI 3.0 から API クライアントを自動生成するまでの過程を紹介します。 実はメインのプロダクトへ TypeScript を導入できたのはつい最近のことです。 API クライアントを自動生成するまでの苦労や新たな発見が 1 つでも参考になれば嬉しく思います。 課題 API クライアントの自動生成に取り組む上で、現在の BE と FE には以下の課題がありました。 BE API フレームワーク 移行期のため、OpenAPI 2.0 と 3.0 の 2 つの API 定義ファイルが存在し、自動生成前に merge が必要。 FE TypeScript へ移行できていない JavaScript が大半。 API コールを堅牢にするための独自の機構が複雑で認知負荷が高い。 API レスポンスが class 化されているが TypeScript の型として利用できない。 自動生成の目的 型や API クライアントの自動生成の目的は以下です。 よりシンプルな API コール API の破壊的変更を検知 TypeScript の導入を加速 詳細な背景は以下の通りです。 よりシンプルな API コール 独自の機構を撤廃し、 API コール処理の認知負荷を下げたい。TypeScript の型でよりシンプルに解決できる。 API の破壊的変更を検知 既存の API コールを堅牢化する機構はランタイム上で動作します。そのため API の破壊的な変更を開発中に見逃すことがありました。開発中に 機械的 に検知できる必要がある。 TypeScript の導入を加速 型が自動生成できると以下の要因で加速できる。 TypeScript を導入したばかりで使える型が少ないが、一気に使える型が増える。 型のメンテナンス 工数 が削減できる。 過剰なプロパティを持った型が生まれない。(流用性を高める意図で生まれやすい) ライブラリの比較 以下の 3 つのライブラリが検討の対象です。 openapi-ts(採用) openapi-typescript swagger-typescript- api 結果としては 1. openapi-ts を採用しました。 次にその選定における観点と過程を説明します。 比較観点 型の流用性 API コール時の認知負荷 型の流用性 特に API のパラメータやレスポンスの型が流用しやすい形式であるか。それらの型は API コールの前後の加工処理等で参照したいことがあります。出力される型が API コールの関数のみだと、その関数の型から引数や返り値の型を抜き出す必要があるため複雑になります。 API コール時の認知負荷 API コール時のインターフェースがシンプルかどうか。 API コールのために関数や型をいくつも import したくないです。関数名を書いただけで補完が始まり実装が自然と進んでいく体験が理想です。 以下は上記の観点を具体化した比較表です。 ライブラリ api クライアントの生成 snake_case ↔ camelCase の変換 自動生成時の安定性 型の流用性 API コールに必要な import 数 endpoint の型制御 path parameter の型制御 query parameter の型制御 request body の型制御 response body の型注釈 openapi-ts ◯ ◯ ◯ ◯ △ ◯ ◯ ◯ ◯ ◯ openapi-typescript x ◯ ◯ ◯ x △ △ ◯ ◯ ◯ swagger-typescript- api ◯ ◯ x △ ◯ ◯ ◯ ◯ ◯ ◯ 各ライブラリごとにプロトタイプを実装しました。手を動かして得た発見と評価も合わせて以下に記載します。 openapi-ts メリット API のパラメータ、リク エス トの型が独立して定義されている API コールの関数を自動生成できる API クライアント(fetch, axios, …etc)を選択できる あらゆる型補完が効く デメリット API クライアントの インスタンス を API コールの関数に都度渡す API コールの際に API クライアントの インスタンス として毎回同じものを渡すのが冗長です。しかし、それ以外は観点を満たしていました。 実装例 import { typedAxios } from "./client" import { postV2Authentication } from "./generated/sdk.gen" import { getV2DocumentDocumentId } from "./generated/sdk.gen" import { getV2DocumentDiff } from "./generated/sdk.gen" // 認証 postV2Authentication( { client : typedAxios, // fetchやaxios等のAPIクライアントを毎回渡す必要がある body : { email , password } , } ) // document取得 getV2DocumentDocumentId( { client : typedAxios, path : { documentId : 1 } , } ) // user取得 getV2User( { client : typedAxios, query : { userId : 1 } , } ) requestBody の型参照も簡単です。 import { typedAxios } from "./client" import { postV2Authentication } from "~/api/openapi-ts/generated/sdk.gen" import type { PostV2AuthenticationData } from "./generated" // requestBodyの型をimport(pathパラメータ、queryパラメータも可) export const authentication = async ( { email , password } : PostV2AuthenticationData[ "body" ]) => { // ...何か前処理をしたり const response = await postV2Authentication( { client : typedAxios, body : { email , password } , } ) return response.data } openapi-typescript メリット 型のみの生成でカスタマイズ性が高い デメリット API コールの関数生成には派生ライブラリの openapi-fetch が必要( API クライアントは fetch 限定) axios を利用すると必要な import が多い API クライアントが fetch であれば有力だった可能性がありますが、現状は axios を活用しています。また型のみを生成するのは流用性が高く良いと思っていました。しかし axios に型を渡して矯正すると API コールのために必要な import が増えます。そして型の構造的に必要な型を探り当てるのが面倒でした。 実装例 import { type paths, type operations } from "./generated/schema.d" import { typedAxios } from "./client" // 認証 type Request = operations [ "postV2Authentication" ][ "requestBody" ][ "content" ][ "application/x-www-form-urlencoded" ] type Response = operations [ "postV2Authentication" ][ "responses" ][ "201" ] typedAxios.post< Response >( "/v2/authentication" , { email , password , } ) // ドキュメント取得 type Request = operations [ "getV2DocumentDocumentId" ][ "parameters" ][ "path" ] type Response = operations [ "getV2DocumentDocumentId" ][ "responses" ][ "200" ][ "content" ][ "application/json" ] typedAxios. get < Response >( `/v2/document/ ${ documentId } ` ) 上記はプレーンな axios のため import が多く、型の深堀りが必要です。 それを解消したものも実装しました。OpenAPI 3.0 の構造では HTTP メソッドと endpoint の組み合わせで欲しい API が特定できます。そのため HTTP メソッドと endpoint を渡せばパラメータやリク エス トの型を推論できる axios を実装しました。コードすべてではないですが実装の概要は把握できると思います。 渡す型を最小限にした axios // カスタマイズしたaxios const customAxios = async <M extends Methods, E extends Endpoint< M >>( { methods , endpoint , pathParams , queryParams , body , } : { methods : M endpoint : E pathParams ?: Snake2Camel < PathParams < M >> queryParams ?: Snake2Camel < QueryParams < M >> body ?: Snake2Camel < RequestBody < M , E >> } ): Promise < AxiosResponse < Snake2Camel < SuccessResponse < M , E >>>> => { const dynamicEndpoint = pathParams ? getDynamicEndpoint(endpoint, camel2SnakeDeep(pathParams)) : endpoint const snakeCaseBody = body ? camel2SnakeDeep(body) : body const response = await axios[methods]< SuccessResponse < M , E >>( ` ${ dynamicEndpoint }${ getQueryParams(queryParams) } ` , snakeCaseBody ) return { ...response, data : snake2CamelDeep(response.data), } } // 呼び出しイメージ await customAxios( { methods : "get" , endpoint : "/api/v2/document/{document_id}" , pathParams : { documentId } , } ) 呼び出し時には補完が HTTP メソッド → endpoint → パラメータと順番に絞り込まれるように推論されます。しかし見ての通り実装が複雑です。他のライブラリのように endpoint 毎に関数が生えた方が圧倒的にリーダブルです。また上記を用いて AI にコード生成を指示するとコード生成 → 型エラー → コード生成 を繰り返して徐々に正しいコードに近づけていく様子で、AI の精度が落ちるのも難点でした。 swagger-typescript- api メリット 呼び出しが最もシンプル API クライアント(fetch, axios, …etc)を選択できる デメリット JSON ファイルに特定の文字が含まれると自動生成に失敗する パラメータやレスポンスの型が流用しづらい API コールのインターフェースは最もシンプルでした。しかし 参照元 の JSON ファイルに「*( アスタリスク )」が含まれていると自動生成に失敗します。OpenAPI のコメント等には様々な文字列を使う可能性があるため運用が辛くなる印象です。またパラメータやレスポンスの型が独立して参照できません。型の取り出しが面倒でした。 型の取り出し import { typedAxios } from "./typedAxios" export const getDocument = async ( { documentId , } : // 特定のqueryパラメータが欲しい時にapiの関数から引数の型を抜き出す必要がある。 Parameters< typeof typedAxios.v2.getV2Document >[ "0" ]) => { return await typedAxios.v2.getV2Document( { documentId , } ) } 実装例 import { Api } from "./api" const typedAxios = new Api() // 認証 typedAxios.v2.postV2Authentication( { email , password , } ) // ドキュメント取得 typedAxios.v2.getV2DocumentDocumentId(documentId) ライブラリ比較まとめ 消去法的に openapi-ts を採用しました。 以下の懸念が他ライブラリのノックアウトファクターでした。 openapi-typescript 呼び出し時の認知負荷の高さ swagger-typescript- api JSON ファイルに使われている文字を気にする必要がある 型の流用性が低い 導入の前処理 冒頭にあった課題を払拭するために以下の前処理が必要です。 OpenAPI 2.0(Swagger) → OpenAPI 3.0 の変換 openapi. json の merge snake_case ↔ camelCase の変換 OpenAPI 2.0(Swagger) → OpenAPI 3.0 の変換 API クライアントの自動生成ライブラリは OpenAPI 3.0 形式であることを想定しているため、 前処理として swagger2openapi というライブラリで OpenAPI 2.0(Swagger) → OpenAPI 3.0 へ変換しました。 npx swagger2openapi swagger.json -o openapi.json 1 コマンドでキレイに 2 系 →3 系になってくれて嬉しかったです。 openapi. json の merge openapi-ts が読み込める JSON ファイルは 1 つなので、2 つの API 定義 JSON を merge します。openapi. json の中には様々なプロパティがありますが、merge したいのは以下の 2 つです。 path: 各 endpoint と HTTP method 等の情報が定義 components: 具体的な スキーマ を内包 上記を単純に merge することで欲しい json が手に入りました。 import * as openapiJson1 from "openapi-1.json" import * as openapiJson2 from "openapi-2.json" import fs from "fs" /** * openapi.jsonをマージして新規ファイルとして出力 */ const mergedJson = { ...openapiJson1, paths : { ...openapiJson1.paths, ...openapiJson2.paths, } , components : { ...openapiJson1.components, ...openapiJson2.components, } , } fs.writeFileSync( "merged.json" , JSON . stringify (mergedJson, null , 2 )) snake_case ↔ camelCase の変換 FE のコーディングスタイルが camelCase なのに対して BE は snake_case です。この乖離については API コールのパラメータ作成時やレスポンス受け取り時に変換をする必要があります。 API コール時に都度変換するのは認知負荷が高いため共通処理に含めることにしました。 共通処理に含めるデメリットとして以下があります。 変換ユーティリティの開発・メンテナンスの手間 ランタイム上の変換処理によるオーバーヘッド しかし上記よりも開発者体験の方が価値があると判断しました。 また重要なポイントとして API クライアントの自動生成前の API 定義 JSON にもケース変換を施しました。つまり API 定義 JSON の時点でパラメータやレスポンスを camelCase にしておきました。これをしないと生成後の API クライアントが型補完で snake_case を要求してしまうので type error になります。openapi. json の時点でケース変換ができると関数の引数と返り値の型としては camelCase で出力してくれます。あとは axios の interceptors に変換処理を入れるだけです。 axios の interceptors import { client } from "client.gen" // 自動生成されたaxiosのクライアント client.setConfig( { baseURL : "/" } ) /** リクエストパラメータをsnake_caseに変換 */ client.instance.interceptors.request.use(( request : InternalAxiosRequestConfig < any >) => { // snake_caseへの変換処理 return request } ) /** レスポンスデータをcamelCaseに変換 */ client.instance.interceptors. response .use(( response : AxiosResponse < any , any >) => { // camelCaseへの変換処理 return response } ) export { client } 本筋から脱線: 型の上書きでケース変換 BE が生成した 参照元 の JSON を FE の都合に合わせて変更してしまうと思わぬ不都合が起きるのではと懸念がありました。そのため型の上書き等も試してみました。 いざ試すと生成後の関数や型に対しての TypeScript 上でのケース変換はしんどいです。例えばパスパラメータの型を snake_case から camelCase に変換するだけでも後述の複雑な型が必要になります。またランタイムで実行されるコードと比較して型に対しての検証は難しいです。そのためこの複雑な実装よりかは JSON を書き換える方が妥当と考えました。 型変換の一部 type Snake2CamelString < T extends string > = T extends ` ${ infer R } _ ${ infer U } ` ? ` ${ R }${ Capitalize< Snake2CamelString < U >> } ` : T // keyをsnake_case → camelCase type Snake2Camel < T > = T extends any [] ? Snake2Camel < T [number]>[] : T extends object ? { [ K in keyof T as Snake2CamelString < string & K >]: Snake2Camel < T [K]> } : T // httpメソッド type Methods = "get" | "post" | "put" | "patch" | "delete" // endpointのURL type Endpoint < M extends Methods > = { [ Key in keyof paths ]: M extends keyof paths [Key] ? Key : never } [keyof paths] // path parameter type PathParams < M extends Methods > = { [ Key in Endpoint < M >]: M extends keyof paths [Key] ? paths [Key][M] extends { parameters : { path : infer T } } ? T extends { [ key : string ]: string | number } ? T : never : never : never } [Endpoint<M>] // 最終的に欲しいpathParamsの型。 // これ以外にもqueryParamsやrequestBody,responseBodyにも似たようでちょっと違う変換をするgenericが必要 Snake2Camel< PathParams < M >> (脱線終わり。) before / after 今までは API コールの前後にクラスを通していました。 API クライアント自動生成後は関数を呼ぶだけでシンプルです。BE で破壊的変更も type error として検知できます。 今までの呼び出しイメージ import { repositoryFactory } from "./repositoryFactory" // parameterのバリデーションやケース変換 import { DocumentEntityClass } from "./documentEntityClass" // responseのケース変換やオブジェクト化 const documentGetRequest = repositoryFactory . documentDiff . getParam () documentGetRequest . documentId = documentId const documentEntity = new DocumentEntityClass () const response = await repositoryFactory . documentDiff . get ({ documentGetRequest }) documentEntity = response . data 新しい API コール import { typedAxios } from "./client" import { getV2DocumentDocumentId } from "./generated/sdk.gen" getV2DocumentDocumentId( { client : typedAxios, path : { documentId } , } ) まとめ API 定義 JSON から型だけを出力しても、認知負荷の低い API コールの実現は難しいことが分かりました。型のみでは結局、認知負荷を下げるために共通処理に複雑さが必要になってしまいます。 共 通化 するのではなく、シンプルな成果物に変換できる機構が必要でした。頑張って共 通化 し、インターフェースがシンプルになれば実装が捗ると思っていましたが、複雑さのシワ寄せとして AI のコード生成精度に影響するという気づきも得ることができました。 また今回の選定においては早めにプロトタイプを実装したことが良かった点だと振り返って思います。やりたいことや実現したいことの中核はぼんやりありましたが、実際に手を動かしてみることで比較するべき観点や実装イメージが具体化されました。ドキュメントに記載のない思わぬ欠点を早めに検知したことも収穫でした。 ご精読ありがとうございました。こうした技術的な意思決定のプロセスや、MNTSQ の日々の開発の進め方にご興味を持っていただけた方は、ぜひお気軽にカジュアル面談でお話ししましょう。 careers.mntsq.co.jp
アバター
小ネタです。そして掲題が全てを語っています。 以下、ECS on EC2 構成の ECS サービスにおいて ECS タスクを動作させるプラットフォームとなる EC2 インスタンス を ECS コンテナ インスタンス と呼称します。これは Launching an Amazon ECS Linux container instance へ微妙に倣っての呼び方になります。 3行で ECS on EC2 構成の ECS サービスで GuardDuty ECS Runtime Monitoring を有効化する場合、ECS サービスの更新は必要ない ECS コンテナ インスタンス で GuardDuty エージェントが動作し EC2 Runtime Monitoring の要件を満たせれば、その時点で ECS Runtime Monitoring も有効になる ECS コンテナ インスタンス での EC2 Runtime Monitoring 導入には GuardDuty エージェントが要求する制約がいくつかある なお本稿が伝えたいことは末尾の 追伸 で全て事足ります。 背景 現在 MNTSQ では SRE を中心にプロダクトセキュリティの向上施策を進めており、その中で GuardDuty の利用範囲拡充も目論んでいます。 ここで白羽の矢が立ったのが GuardDuty の機能のひとつである Runtime Monitoring です。詳細は AWS 公式ドキュメント( GuardDuty Runtime Monitoring ) に譲りますが、EC2 / EKS / ECS において インスタンス やタスクの振舞いを内部から観測し、脅威となりうる挙動の検出が可能なサービスです。 サポートされるサービスは前掲ドキュメントにもある通り EKS / ECS / EC2 で、EKS 以外は MNTSQ のワークロードにも合致します。 さて Runtime Monitoring の導入ですが、この方法にはいくつかの経路があります。 Enabling GuardDuty Runtime Monitoring にその全容があり、実に多様な手法が用意されていることが伺えます。MNTSQ では 全ての AWS アカウントは AWS Organizations で管理している GuardDuty 管理用に delegated admin として設定された AWS アカウント( AWS Organizations 配下だが organizations 管理アカウントではない)が存在する 導入対象は ECS とし、EC2 は追って導入を検討する ECS Runtime Monitoring 有効化にかかる手間は最小限のものとしたい。自動導入の仕組みがあれば積極的にこれを使いたい ただし有効化にかかる影響範囲のコン トロール を行いたいので Runtime Monitoring を有効にする AWS アカウントは明示的に選択したい Runtime Monitoring を有効にする決定をした AWS アカウント内では全ての ECS クラスタ を一律対象とする という背景があり、以下の要領で有効化を進めることにしました。 Runtime Monitoring 有効化対象のアカウントは明示的に指定する Enabling Runtime Monitoring for multiple-account environments の "For selective active member accounts only" で有効化設定をする GuardDuty エージェントの導入は自動でやってもらう Managing automated security agent for Fargate (Amazon ECS only) に解説があるとおり、Runtime Monitoring 有効化設定を投入した後に ECS サービスを更新すれば自動で GuardDuty エージェントが サイドカー コンテナとして起動してくる なお、ECS on Fargate 構成において GuardDuty エージェントが サイドカー コンテナとして起動する際、 ECS タスク定義への変更は特段発生しません 。 ECS サービス / タスク定義の範囲外の箇所で aws-guardduty-agent- という接頭辞の名称をもつコンテナが自動で起動してくるようになります。詳細は How Runtime Monitoring works with Fargate (Amazon ECS only) の "GuardDuty adds a sidecar container" の節に説明があります。 さて2025年6月現在、MNTSQ で扱う ECS サービスでは ECS on Fargate ECS on EC2 の2種類の構成をとるものがあります。ECS サービスの数で言えば ECS on Fargate が圧倒的に多く、ECS on EC2 は一部用途(主に GPU を利用したい向き)で使われるのみです。 前述ドキュメントに従い ECS Runtime Monitoring の導入をすすめると、ECS on Fargate に関しては ECS サービス更新後に Reviewing runtime coverage statistics and troubleshooting issues に示される手法にて healthy(= GuardDuty エージェントが稼動し Runtime Monitoring の動作も開始した)なことが確認できるようになりました。 実際の runtime coverage 画面。伏字が多い点はご容赦ください いっぽうでこの手法では ECS on EC2 構成の ECS サービスでは coverage が unhealthy のままになってしまう という気付きも得られました。さてどうしたことでしょう。 ECS Runtime Monitoring を ECS on EC2 構成で healthy にする ECS と EC2 のそれぞれで Runtime Monitoring がどのように動作するかは以下ドキュメントに示されています。 How Runtime Monitoring works with Amazon EC2 instances How Runtime Monitoring works with Fargate (Amazon ECS only) いずれも GuardDuty エージェントが動作していることが前提で、EC2 の場合は Systemd ユニットとして、ECS の場合は サイドカー コンテナとして動作します 今回目指したい ECS on EC2 構成の場合でも Fargate ではないとはいえ ECS ではあるので、 サイドカー コンテナとして GuardDuty エージェントは動作するのではないかと考えるのは自然なはずです。少なくとも本稿筆者はそう考えました。しかしながら実際に作業をしてみると 作業 ECS on Fargate で Runtime Monitoring が healthy になった ECS on EC2 で Runtime Monitoring が healthy になった GuardDuty 側で ECS Runtime Monitoring を有効にした × × エージェントの自動導入を有効にした × × ECS タスクの入れ替えをした × × ECS サービスを更新した ○ × ECS コンテナ インスタンス で GuardDuty エージェントを動作させた ×(関係なし) ○ という格好になりました。つまり ECS on EC2 構成の場合は ECS ではなく EC2 側での Runtime Monitoring 対応が必要 という洞察が得られました。 ちなみにこのとき ECS サービス上では サイドカー コンテナとしての GuardDuty エージェントは稼動せず、EC2 インスタンス 上で動作する GuardDuty エージェントがその役目を担っている模様です。 ECS コンテナ インスタンス で GuardDuty エージェントを動作させる どこで何が必要になるかが判れば話は早いです。事前準備としては以下を参照すればよいでしょう。 How Runtime Monitoring works with Amazon EC2 instances Prerequisites for Amazon EC2 instance support 早い話が以下です。 EC2 インスタンス プロファイルで SSM の Run Command によるコマンド実行が許可されるよう権限設定を行う 新しめの Linux カーネル が利用可能な状態で EC2 インスタンス を稼動させる Linux カーネル のバージョンが見落されがちなので注意が必要です。筆者は見落しました。 ECS コンテナ インスタンス を動作させる場合、おおよそのケースでは ECS-optimized AMI が利用されると思います。MNTSQ でも例に漏れず ECS-optimized AMI を使用し ECS コンテナ インスタンス を稼動させています。この ECS-optimized AMI で上記ドキュメントに示される Linux カーネル 5.4 以上のもの *1 を使う方法を考える必要があります。 最も簡単なのは Amazon ECS-optimized Linux AMIs や Retrieving Amazon ECS-optimized Linux AMI metadata で案内されている、 Linux カーネル 5.10 を標準で使用する ECS-optimized AMI に差し替えてしまうというものでしょう。弊社でもこの差し替えを行うことで対応としました。 まとめ ECS on EC2 構成の場合は ECS ではなく EC2 側での Runtime Monitoring 対応が必要 という点、早々に気付ければ話は早かったのですが、「ECS が対象なのだから ECS 向けの有効化作業に何か抜け漏れがあるはずだ」と執着してしまい、試行錯誤をする羽目になりました。Runtime Monitoring に関する AWS 公式ドキュメントのうち ECS に言及されるものはほぼ全て ECS on Fargate が対象の模様で、ECS on EC2 構成に関しての言及が見られない点も少々難儀する箇所だったように思います。プラットフォームを自前で管理する場合の観点を今一度鍛えようと思える機会になりました。 ECS on EC2 構成で ECS Runtime Monitoring が一向に有効化できないという状況にお困りの方の一助となれば幸いです。 MNTSQ 株式会社 SRE 秋本 追伸 ECS on EC2 構成の場合は ECS ではなく EC2 側での Runtime Monitoring 対応が必要 という点、本稿筆者が本作業を行った際には先行情報となるものを見付けられず、本記事が有意義なものになると妄想していました。しかし今本記事を書きつつ探してみたところ、 Turning on Runtime Monitoring for Amazon ECS という ECS のドキュメント(GuardDuty のドキュメントではない) で ECS on EC2 向けの解説がありました。もっと早く知りたかった……。 *1 : ECS-optimized AMI は Amazon Linux 2 か Amazon Linux 2023 がベースになっているので、 Ubuntu や Debian 向けの情報として扱われている内容には触れていません
アバター
こんにちは、MNTSQで アルゴリズム エンジニアをやっている平田です。 MNTSQではAIで企業の契約業務を変革するプロダクトを開発しています。 mntsq.co.jp ところでみなさん、 MCP (Model Context Protocol)使っていますか? 2024年11月にAnthropicがMCPを提唱 してから半年しか経っていないのに、 MCP を取り巻くAIエージェント開発のエコシステムは爆発的なスピードで成長を遂げています。 (実際、この記事を書いている最中にアップデートがあって、何度か書き直しています🫠) 先日 MCP がStreamable HTTPをサポートしたため、MNTSQでも自社プロダクトへの MCP 導入を検討し始めました。 Streamable HTTPではサーバーをステートレスにできるので、 アーキテクチャ がシンプルになり、水平スケーリングが容易になります。これはMNTSQのような SaaS での MCP 活用において非常に重要です。 この記事では、具体的なアプリケーションの実装を通じて、 SaaS での利用を想定した MCP の使い方を学びます。 アプリケーションの主な要件は次の通りです: MCP サーバーをステートレスにする : Streamable HTTPでステートレスな MCP サーバーを構築します。これにより、SSE(Server-Sent Events)よりも アーキテクチャ がシンプルになり、保守性やスケーラビリティが向上します。 生成されたツールの情報を検証する : Function Callingで誤ったリソースにアクセスすることを防ぐため、ツールの自動実行を無効化して、生成されたツールの情報を検証・修正してから実行します。 Gemini API を使う : MNTSQの契約データを扱うには、非常に長いコンテキストウィンドウを持つGeminiが適しています。 LangChainを使わない : LangChainは便利ですが、実際には使用しない機能が依存関係に含まれます。依存関係の 脆弱性 や競合によるメンテナンスコストを下げるため、シンプルかつ軽量な構成を保ちます。 アプリケーションの概要 MCP サーバーで提供するツールは何でも良いので、今回はシンプルにElasticsearchをバックエンドとするRAGアプリケーションを実装します。 リポジトリ この記事で紹介する ソースコード や実行方法はすべて次の リポジトリ にあります。 github.com アーキテクチャ graph TD A[Application] -->|ツール実行| M[MCPサーバー<br/>(Streamable HTTP)] A -->|Function Calling| G[Gemini API] M -->|データ取得/検索| E[Elasticsearch] class A appClass class M mcpClass class G apiClass class E dbClass 処理の流れ sequenceDiagram participant App as Application participant Gemini as Gemini API participant MCP as MCPサーバー Note over App: 初期化 App->>MCP: セッション開始 Activate MCP App->>MCP: ツール一覧取得 (list_tools) MCP-->>App: 利用可能なツール一覧<br/>(get_indices, get_mapping, search) Note over App: 初回リクエスト App->>Gemini: コンテンツ生成リクエスト<br/>(プロンプト + ツール定義) Gemini-->>App: レスポンス loop レスポンスにfunction_callが含まれる限り Note over App: Function Calling App->>MCP: ツール実行 MCP-->>App: ツール実行結果 Note over App: 2回目以降のリクエスト App->>Gemini: 次のコンテンツ生成リクエスト<br/>(履歴 + ツール実行結果) Gemini-->>App: レスポンス end Note over App: 最終回答 App->>App: 最終回答を出力 App->>MCP: セッション終了 Deactivate MCP 例えば、「昨日の売上をカテゴリ別に集計してください。」というプロンプトに対して、次のように動作します。 セッション開始 MCP サーバーからツール一覧を取得 ツール get_indices によりインデックス一覧を取得 ツール get_mapping により kibana_sample_data_ecommerce の マッピング を取得 ツール search によりプロンプトをElasticsearchの DSL に変換して検索を実行 検索結果から回答を生成 回答例: 昨日のカテゴリ別の売上は以下の通りです。 * **Men's Clothing**: 3999.13 * **Women's Clothing**: 3924.91 * **Women's Shoes**: 3360.66 * **Men's Shoes**: 2197.89 * **Men's Accessories**: 1669.72 * **Women's Accessories**: 1292.59 解説 MCP サーバー MCP サーバーの実装は、主に MCP Python SDK の公式ドキュメントを参考にしています。 github.com MCP サーバーは、次の3つのツールを提供します。 get_indices : インデックス一覧取得 get_mapping : マッピング 取得 search : 検索 @ mcp.tool (description= "Elasticsearchで検索を実行するツール" ) def search (index: str , query_body: dict [ str , Any], ctx: Context) -> Any: logger.info( "search tool called" ) logger.debug(f "index: {index}, query: {query_body}" ) es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.search(index=index, body=query_body) return response @ mcp.tool (description= "Elasticsearchのインデックスを取得するツール" ) def get_indices (ctx: Context) -> Any: logger.info( "get_indices tool called" ) es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.indices.get_alias( "*" ) return response @ mcp.tool (description= "Elasticsearchで指定したインデックスのマッピングを取得するツール" ) def get_mapping (index: str , ctx: Context) -> Any: logger.info( "get_mapping tool called" ) logger.debug(f "index: {index}" ) es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.indices.get_mapping(index=index) return response ステートレスを有効化するには FastMCP で stateless_http=True を指定します。 mcp = FastMCP(name= "SearchServer" , stateless_http= True , lifespan=lifespan) MCP サーバーは FastAPI にマウントできます。 app = FastAPI(lifespan=lifespan) app.mount( "/search" , search.mcp.streamable_http_app()) MCP クライアント MCP クライアントの実装は、主に Google Gen AI SDK ( google-genai==1.19.0 )の公式ドキュメントを参考にしています。 ai.google.dev Google Gen AI SDK の公式ドキュメントとの差分は次の3点です。 stdio_client ではなく streamablehttp_client を使う ツールの自動実行を無効化する 最終回答に至るまでFunction Callingを繰り返す Streamable HTTPを使用するため、 streamablehttp_client を使ってセッションを開始します。 async with streamablehttp_client( f "http://{mcp_server_host}:{mcp_server_port}/search/mcp/" ) as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() Function Callingのために、 MCP サーバーで提供されるツールの情報と、ツールの自動実行を無効化する設定をGemini API に渡します。 MCP サーバーで提供されるツールの情報は、 tools=[session] で渡します。( google-genai==1.15.0 以前は types.FunctionDeclaration オブジェクトに変換する必要がありました。) ツールの自動実行を無効化する設定は、 automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True) で渡します。無効化する理由は、主に認可の目的で、生成されたツールの情報を検証・修正してから実行するためです。 config = types.GenerateContentConfig( system_instruction=SYSTEM_INSTRUCTION, temperature= 0 , tools=[session], # type: ignore[arg-type] automatic_function_calling=types.AutomaticFunctionCallingConfig(disable= True ), ) ツールの自動実行を無効化しているので、ツールは自分で実行する必要があります。ツールの実行に関する情報(ツール名と引数)は response.candidates[0].content.parts[-1].function_call に出力されます。 Gemini API の公式ドキュメントでは parts[0] を使用していますが、次のように text と function_call が混在する場合があるため parts[-1] を使用するほうが確実です。この順番は Google が保証するものではないですが、ツールの自動実行を無効化するとユーザーの types.FunctionResponse を待ち受ける状態になるので、末尾になると考えるのが自然です。 { " parts ": [ { " text ": " 昨日の売上をカテゴリ別に集計するために、まず売上情報が含まれていそうなインデックスを特定する必要があります。 \n 利用可能なインデックスをリストアップするために、`get_indices()` を実行します。 \n " } , { " function_call ": { " args ": {} , " name ": " get_indices " } } ] } ツールは MCP サーバーが提供しているので、 MCP サーバーにリク エス トしてツールを実行します。 # NOTE : ツールの引数を修正する場合は、 `function_call.args` を直接書き換える # function_call.args["index"] = "kibana_sample_data_ecommerce" result = await session.call_tool(function_call.name, arguments=function_call.args) まとめ この記事では、RAGの実装を通じて、Gemini Function Callingと MCP Streamable HTTPの使い方を学びました。 MCP がStreamable HTTPをサポートしたおかげで、 MCP サーバーの アーキテクチャ がシンプルになりました。 MCP が JSON -RPCを採用しているおかげで、生成されたツールの情報を柔軟に検証・修正することができます。 余談ですが、実装したアプリケーションを実際に動かしてみると、「売上」に対してちゃんと kibana_sample_data_ecommerce インデックスを参照したり、 DSL を教えていないのにElasticsearchという情報だけで正しくクエリを生成したりと、Geminiの生成能力に驚かされます。 MCP を取り巻くエコシステムはまだ発展途上なので、この記事の情報もすぐ古くなってしまうかもしれませんが、みなさんのAIエージェント開発の一助となれば幸いです。 もしMNTSQの仕事にご興味を持っていただけたら、ぜひ以下のページもご覧ください。 careers.mntsq.co.jp note.mntsq.co.jp
アバター
はじめに 構成 実装してみる EventBridge EventBridge API destinationsの設定 オートスケールイベントを拾うためのEventルールを作成する Datadog Slack連携の設定 Event Monitorの設定 おわりに はじめに ECSのオートスケールは、一度設定してしまえば非常に便利ですが、人の手を離れて安定運用に乗せるまでには様々な技術的なハードルがあります。安定運用に入るまでは、現在の設定は妥当なのかを判断するため、オートスケールが発生したことを何らかの方法で人間が把握し、日々改善を重ねていくことが必要不可欠です。そこで今回は、ECSのオートスケールイベントをEventBridgeで拾い、Datadogに連携してSlack通知する仕組みを実装してみます。 構成 全体の構成は以下の通りです。ECS クラスタ やサービスを限定せずに、オートスケールイベントが発生した時に通知を行うような汎用的な仕組みを作っていきます。 オートスケールのイベントをEventBridgeで拾う Datadog Event Management API からDatadogに連携する Datadog Event Monitorにてアラートの設定を行う Slackにメッセージを送信する 実装してみる EventBridge EventBridge API destinationsの設定 EventBridgeでは、 API destinations(API送信先) を設定することによって、 AWS 内のイベントを任意の API に連携することができます。 以下のサンプルコードにて、 API destinationsおよび必要なIAMリソースを作成します。DATADOG_ API _KEYはあらかじめSecrets Managerに登録されているものとしています。 サンプルコード # IAM data "aws_iam_policy_document" "eventbridge_sts" { statement { effect = "Allow" actions = [ "sts:AssumeRole" ] principals { type = "Service" identifiers = [ "events.amazonaws.com" ] } } } data "aws_iam_policy_document" "eventbridge_datadog" { statement { effect = "Allow" actions = [ "events:InvokeApiDestination" ] resources = [ aws_cloudwatch_event_api_destination.datadog.arn ] } statement { effect = "Allow" actions = [ "secretsmanager:DescribeSecret" , "secretsmanager:GetSecretValue" , ] resources = [ "arn:aws:secretsmanager:*:*:secret:events!connection/<DATADOG_API_KEYのシークレット名>/*" ] } } resource "aws_iam_role" "eventbridge_datadog" { name = "eventbridge_datadog_role" assume_role_policy = data.aws_iam_policy_document.eventbridge_sts.json } resource "aws_iam_role_policy" "eventbridge_datadog" { name = aws_iam_role.eventbridge_datadog.name role = aws_iam_role.eventbridge_datadog.id policy = data.aws_iam_policy_document.eventbridge_datadog.json } # あらかじめSecretsManagerにDATADOG_API_KEYを登録しておくこと data "aws_secretsmanager_secret_version" "datadog_api_key" { secret_id = "<DATADOG_API_KEYのシークレット名>" } # Connection resource "aws_cloudwatch_event_connection" "datadog" { name = "datadog-event-api" authorization_type = "API_KEY" auth_parameters { api_key { key = "DD-API-KEY" value = data.aws_secretsmanager_secret_version.datadog_api_key.secret_string } } } resource "aws_cloudwatch_event_api_destination" "datadog" { name = "datadog-event-api" connection_arn = aws_cloudwatch_event_connection.datadog.arn invocation_endpoint = "https://api.datadoghq.com/api/v1/events" http_method = "POST" invocation_rate_limit_per_second = 10 } EventBridgeのマネジメントコンソール左側のツリーから「 API の 送信先 」および「接続」を確認し、"datadog-event- api "のリソースが作成されていたらOKです。 オートスケールイベントを拾うためのEventルールを作成する ECSのオートスケールイベントは、一例としては以下のように、" aws .ecs"の"UpdateService"イベントを呼び元の"userIdentity"で絞ることによって判別することができます。" aws .application-autoscaling"のイベントもありますが、こちらで設定してもスケールの最大値に達した時しかトリガされないようです。ただでさえ動作確認が大変なところなので、気をつけましょう。 { " detail-type ": [ " AWS API Call via CloudTrail " ] , " source ": [ " aws.ecs " ] , " detail ": { " userIdentity ": { " invokedBy ": [ " ecs.application-autoscaling.amazonaws.com " ] } , " eventSource ": [ " ecs.amazonaws.com " ] , " eventName ": [ " UpdateService " ] } } ※今回やってはいけないイベントパターンの設定(スケール最大値に達した時しか発生しない) { " source ": [ " aws.application-autoscaling " ] " detail-type ": [ " Application Auto Scaling Scaling Activity State Change " ] } ターゲットには、先ほど作成した API destinationsを指定します。 また、拾ったイベントの json を API のリク エス トボディによしなに整形するため、 入力トランスフォーマ を以下のように設定します。 入力パスで使用したい変数をバインドし、入力テンプレートでリク エス トボディを作成しています。 ※ 入力パス { " newDesiredCount ": " $.detail.requestParameters.desiredCount ", " service ": " $.detail.requestParameters.service " } ※ 入力テンプレート { " title ": " ECS Cluster Auto Scaling ", " source_type_name ": " amazon ecs ", " alert_type ": " info ", " text ": " %%%`<service>` is auto scaled. \n New desiredCount: <newDesiredCount> %%% ", " tags ": [ " environment:development ", " source:amazon_ecs ", " aws_account:************ ", " new_desired_count:<newDesiredCount> ", " ecs_service_name:<service> " ] } これらの設定のサンプルコードです。 サンプルコード # ECSサービスのオートスケールを検知して通知するイベント resource "aws_cloudwatch_event_rule" "ecs_service_autoscale" { name = "ecs-service-autoscale" event_pattern = jsonencode ( { detail-type = [ "AWS API Call via CloudTrail" ] source = [ "aws.ecs" ] detail = { userIdentity = { invokedBy = [ "ecs.application-autoscaling.amazonaws.com" ] } eventSource = [ "ecs.amazonaws.com" ] , eventName = [ "UpdateService" ] } } ) } # API destinationsをターゲットに設定 resource "aws_cloudwatch_event_target" "ecs_service_autoscale" { rule = aws_cloudwatch_event_rule.ecs_service_autoscale.name arn = aws_cloudwatch_event_api_destination.datadog.arn role_arn = aws_iam_role.eventbridge_datadog.arn input_transformer { input_paths = { newDesiredCount = "$.detail.requestParameters.desiredCount" , service = "$.detail.requestParameters.service" } # jsonencodeが特殊文字をエスケープしてしまうので、ヒアドキュメントを使用する input_template = <<EOF { "title": "ECS Service Auto Scaling", "source_type_name": "amazon ecs", "alert_type": "info", "text": "%%%`<service>` is auto scaled.\n New desiredCount: <newDesiredCount> %%%", "tags": [ "environment:development", "source:amazon_ecs", "aws_account:**********", "event_name:ecs_service_autoscaled", "new_desired_count:<newDesiredCount>", "ecs_service_name:<service>" ] } EOF } } Eventルールが作成されたら、何らかの方法でECSサービスをオートスケールさせてみましょう。DatadogのEvent Explorer ( https :// .datadoghq.com/event/ explorer )にて以下のようなイベントが飛んできていることを確認します。"event_name:ecs_service_autoscaled "でクエリをすれば出てくるはずです。 ※ イベントがDatadogに飛んでこない時 イベントが飛ばない時は以下を行いましょう EventルールのターゲットにCloudWatchを設定 コンソールからEventルールを選択し、「ターゲット」のタブから「編集」でCloudWatchロググループを追加します。追加の権限などは不要なはずです。イベントが正しく拾えていれば、設定したロググループにログストリームが生成されているはずです。また、ロググループからイベントの詳細を確認できるので、入力テンプレートの情報を充実させたい場合などにもこちらを参照します。 Eventルールの API destinationsのターゲットにDLQを設定する コンソールからEventルールを選択し、「ターゲット」のタブから「編集」-> 「ターゲットを選択」のページに移動します。ターゲットは複数設定できますが、 API destinationsのものを選び、「追加設定」から以下のようにDLQを設定します。Datadogの API を読んだ時にエラーが返ってきていた場合は、設定したキューを確認することでレスポンスを確認できます。 Datadog Slack連携の設定 Slack通知を行うためには、あらかじめDatadogとSlack側で簡単な設定が必要です。本記事では設定方法の説明は割愛するので、公式ドキュメントを参考に設定を行ってください。 docs.datadoghq.com Event Monitorの設定 Datadogコンソールの左側のツリーから「Monitors」を選択し、遷移後画面の左上にある「+New Monitor」をクリックします。(もしくはブラウザに https :// .datadoghq.com/monitors/createを入力)作成画面では「Event」を選択します。 1. Define the search query を以下のように設定します クエリの欄に"event_name:ecs_service_autoscaled "を入力 "ecs_service_autoscaled"は入力テンプレートで独自定義したものなので、これで目的のイベントのみを拾います by句に"new_desired_count"と"ecs_service_name"を入力 サービス, スケールイベントごとに通知が飛ぶようにマルチモニター化します。また、ここでby句に指定したTagsのみが通知メッセージに埋め込み可能になります。 2. Set alert conditions を以下のように設定します Trigger when the evaluated value is " above or equal to "に設定 イベント発生を検知したいので データ点1 以上 でWarnアラートを飛ばせるように設定します Alert thresholdを適当な大きな値に設定 オートスケールイベントの通知はWarnレベルで行いたいので、Alertレベルの通知は行わないように、適当に大きな値にします。(本当はInfoレベルにしたいですが、2025/06現在ではDatadogの仕様上不可能と回答をサポートの方にいただきました) Warning thresholdを1に設定 イベントが起きる毎に通知を行いたいので、1に設定します 3. Configure notifications & automations をお好みで設定します 以下は設定例です。{{ }}で囲んである部分には、「EventのTagsに設定されている」かつ「by句で指定している」値のみ埋め込めます。通知をリッチにしたい場合は、 AWS のEventルール側で入力テンプレートの"tags"を充実させ、DatadogのEvent Monitor側のby句でも使用したいTagsを指定しましょう。また、メッセージ全体を{{#is_warning}}{{/is_warning}}で囲えば、Warn状態からの リカバリ 時の通知が飛ばなくなります。 以上で、オートスケールイベントが起きた時に、以下のようにSlackに通知が飛ぶようになりました。 おわりに とりあえずEventBridge -> Datadog -> Slackの通知ができそうだということで仕組みを作成してみましたが、思ったよりも通知がゴチャついてしまったなという印象があります。(通知タイトルに "on ecs_service_name:~"ってついてしまうなど)もしかしたら、以前紹介した Amazon Q Developer in chat applications(旧: AWS Chatbot)を使用した仕組み の方が、スマートに通知できたような気もしています。とはいえオートスケールイベントを通知するという目的は達成しており、監視・モニタリング系の管理をDatadogに集約することには運用上のメリットもあるので、そことの兼ね合いでもあると思います。ここら辺は今後のDatadogのアップデートにも期待ですね。 MNTSQ株式会社 SRE 西室
アバター
はじめに AWS Organizations にて複数の AWS アカウントを管理する場合において、各アカウントへどのようにアクセスするかは色々と検討の余地があると思います。 弊社では長らくこれを以下のような手法で運用していました。 踏み台的用途の AWS アカウントを1つ用意し、そこに作業者が使う IAM ユーザ及びスイッチ用 IAM ロールを用意 各 AWS アカウントにもスイッチ先 IAM ロールを用意 踏み台 AWS アカウント上の IAM ユーザから、作業対象 AWS アカウント上の IAM ロールへスイッチすることで、対象 AWS アカウントへアクセス 今回このあたりを全て IAM Identity Center を用いて以下のような運用に改めました。 IAM ユーザを考慮しなくてよくした IAM ロールも考慮しなくてよくした 全社的な ID 基盤(弊社では Entra ID )を IdP としての AWS への SSO ログイン及び各 AWS アカウントへのアクセスに寄せた これら作業にかかる考慮事項や実際の作業内容、設定完了後の利用感等、コード例を交えつつ解説します。 これまでの状況 構成 弊社における AWS アカウントの状況は以下の通りです。おおむね AWS のホワイトペーパー *1 に則った構成になっているはずです。 AWS Organizations で全ての AWS アカウントを一元管理 AWS アカウントの内訳は以下のとおり AWS Organizations 管理用 AWS アカウント 各 AWS アカウントへスイッチする為の IAM ユーザ / ロール等を管理する為の AWS アカウント プロダクトのワークロードが存在する AWS アカウント 開発環境 QA 環境 ステージング環境 本番環境 etc. 各種内部用途の為の AWS アカウント 全ての AWS アカウントへは 上述 2. に存在する IAM ユーザを使用し、 AWS アカウント間で IAM ロールをスイッチする構成をとっています。これを図にしたものが以下です。 スイッチロール構成。前述 2. を admin に、それ以外の AWS アカウントを member-X (X は適当な数)に、それぞれ置き換えている点に注意 課題 いっぽうで実際の運用上、弊社では以下のような課題感がありました。 IAM ユーザ管理コストが一定発生する 現存 IAM ユーザが在籍者に紐付くものか把握しておく必要がある 入退職や異動の際の IAM ユーザ管理作業が都度発生する IAM ロール(スイッチ元 / 先両方)の管理コストも一定生じる 現在は principal tag を使った制御 *2 にてスイッチ先 AWS アカウントおよび IAM ロールの制限を実施している 状況に応じ特殊な権限設定 / スイッチ先アカウント設定が必要になるケースがあり、タグ設計を検討する場合がある スイッチロール時の煩雑な操作を緩和するために外部ツールに頼る必要がある 実際に使っているもの Web ブラウザ: AWS Extended Switch Roles CLI : aws-vault 外部ツールを使うこと自体が問題なのではなく、ツールに依存性が生じる事(= コントールが難しい領域が発生する事)に課題感がある 3. については AWS が公式に提供 / メンテナンスをしているツール群のみの利用とできるのが理想です。 また 1. については、現在弊社では全社的な ID 基盤として Entra ID を使用しており、ここで管理されるユーザ情報が AWS で利用出来れば IAM ユーザを管理する必要がなくなるため、かなり嬉しくなれます。 ただし IAM ユーザのみを考慮した場合 2. がクリアできず、このあたりも吸収できるようなうまい策を考えなければなりません。 IAM Identity Center を導入する 前項で述べた課題感にほぼ対処できる策として IAM Identity Center があります。詳細はリンク先に譲るとして、これは早い話が AWS 上で利用できる ID 基盤です。 AWS Organizations とも特段複雑なことをせずとも連携できます。これを従来の IAM ユーザ / ロールの代わりに用いることで以下が達成できます。 外部 IdP をユーザ / グループの情報ソースとして利用が可能。当該ユーザを使用しての SAML 認証による SSO も実現可 IAM ユーザの管理が不要 IdP から連携されたユーザを用いることでユーザ管理の責務は IdP に移る IAM ロールの考慮も不要 IAM Identity Center 内には権限セット *3 という概念があり、どのような権限を認可するかを IAM ポリシドキュメントの形で設定可能 権限セットの効力範囲は AWS アカウント別に設定可能 実際に採った構成を以下に示します。 SSO 構成 作業 作業は以下の段取りで進める格好としました。 Entra ID と IAM Identity Center との連携設定投入 権限セットの整備 Entra ID から IAM Identity Center に連携されてきたユーザに対しての権限セット / AWS アカウント紐付け ユーザとグループが連携されているがグループは今回考慮していない。後述 1. のみ手作業での対応とし、他は Terraform にて IaC した状態で作業しています。 1. Entra ID と IAM Identity Center との連携設定投入 基本的には Configure SAML and SCIM with Microsoft Entra ID and IAM Identity Center に従うことで作業は完了となります。弊社では今回以下を前提としました。 Entra ID および AWS アカウントについては既存構成をそのまま使う Entra ID 側でのユーザ / グループへの変更が自動で IAM Identity Center へ連携されてくる構成 *4 とする 実際には前掲ドキュメント中の以下を適宜読み替えての実施としました。 Step 3: Configure and test your SAML connection Step 4: Configure and test your SCIM synchronization 本来このあたりも Terraform にて管理できるのが理想ですが、2025年5月現在 Terraform では IAM Identity Center 組織 インスタンス *5 の構成管理が出来ないため、本作業は手作業での実施としています。 2. 権限セットの整備 IAM Identity Center を利用してアクセスした各 AWS アカウント内でどういった操作を認可するかを定義するものが権限セット *6 です。利用者が IAM Identity Center 経由で各 AWS アカウントへアクセスする際に使う IAM ロール / ポリシの定義、という感覚で大方問題ないはずです(当方もその理解でいます)。 実際の定義方法も IAM Identity Center 側で事前に定義された権限セットから選択 IAM ポリシにおける AWS マネージドなものに大方対応する 既存 / 新設した IAM ポリシを割り当てる形式で設定 といった形で、概念としては IAM ロールの定義に似ています。弊社の事例では事前定義の権限セットで充分であった為、こちらを使っての設定としています。 ただしここには微妙に落とし穴があり、現在 IAM Identity Center が設定を推奨している identity-enhanced console session *7 を有効にする場合、事前定義の権限セットを素直に使うのみでは AWS アカウントへのアクセス時に HTTP 400 が発生して難儀することになります。 対処としては Granting permissions to use identity-aware console sessions で挙げられている IAM ポリシ ステートメント を権限セット側に設定すれば解決しますが、当該ドキュメントのコード例は resource に AWS アカウント ID を指定する厳格なもので、不特定多数の AWS アカウントへのアクセスが想定される IAM Identity Center とは食い合わせが悪いです。弊社では { " Version ": " 2012-10-17 ", " Statement ": [ { " Effect ": " Allow ", " Action ": " sts:SetContext ", " Resource ": " * " } ] } のように、少々ゆとりを持たせた ステートメント を使うようにしました。 3. Entra ID から IAM Identity Center に連携されてきたユーザに対しての権限セット / AWS アカウント紐付け Entra ID と IAM Identity Center との連携が無事に済むと、連携対象とした Entra ID 上のユーザとグループが IAM Identity Center 上で見えるようになります。これらに権限セットと操作対象 AWS アカウントとを設定してやれば晴れて Entra ID 側情報を利用しての AWS への SSO アクセスが可能となります。 ユーザとグループのどちらに権限セット / AWS アカウント割り当て設定を行うかは運用形態によって検討の余地があると思います。弊社では ユーザ単位での設定 としました。というのも以下事情が有ったためです。 Entra ID は社内の別部門が管理しており、IAM Identity Center と Entra ID とが密結合になるような状況は避けたかった Entra ID グループ単位での権限設計ではカバーが難しい例外的な設定が求められるケースが一部にあり Entra ID ユーザ単位での設定に利があった Entra ID ユーザ個別に権限セットや AWS アカウントの設定をおこなう煩雑さはコードレベルで可能な限り吸収するようにしました。 コード例 おまたせしました。IAM Identity Center で権限セットと AWS アカウントとを IAM Identity Center ユーザに紐付けする Terraform コード例を示します。実際に運用しているコードを元にしていますが適宜フィクションを交えての内容となる点はご容赦ください。 main.tf /* AWS Organizations で管理される AWS アカウント一覧を得るためのもの */ data "aws_organizations_organization" "main" {} /* IAM Identity Center インスタンスに対し必要な設定が行われる これは Terraform リソースでの管理が難しいので、設定は AWS マネジメントコンソール上から行い、Terraform からは参照に留める */ data "aws_ssoadmin_instances" "main" {} /* Identity-aware console sessions を有効にしている場合、以下を権限セットに設定する必要がある これが無いと IAM Identity Center 経由で AWS アカウントへログインができない */ data "aws_iam_policy_document" "identity_aware_sessions" { statement { actions = [ "sts:SetContext" ] resources = [ "*" # 不特定多数の AWS アカウントにログインするのでリソースを絞るのが難しい ] } } # 権限セットに割り当てる IAM ポリシ定義を取得 data "aws_iam_policy" "main" { for_each = local.permission_sets name = each.value.managed_policy_name } # 権限セットを定義する。権限セットへの IAM ポリシの割り当てもここでやる resource "aws_ssoadmin_permission_set" "main" { for_each = local.permission_sets name = each.key description = "Allow permissions defined as AdministratorAccess" instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] # セッションを8時間維持する。ISO 8601 の記法で記載 session_duration = "PT8H" } resource "aws_ssoadmin_managed_policy_attachment" "main" { for_each = local.permission_sets instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] managed_policy_arn = data.aws_iam_policy.main [ each.key ] .arn permission_set_arn = aws_ssoadmin_permission_set.main [ each.key ] .arn } resource "aws_ssoadmin_permission_set_inline_policy" "main" { for_each = local.permission_sets inline_policy = data.aws_iam_policy_document.identity_aware_sessions.json instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] permission_set_arn = aws_ssoadmin_permission_set.main [ each.key ] .arn } /* SCIM 経由で連携された Entra ID ユーザへ権限セットと AWS アカウントとを紐付ける 紐付け関係は locals.tf を参照 */ data "aws_identitystore_user" "main" { for_each = local.users identity_store_id = tolist (data.aws_ssoadmin_instances.main.identity_store_ids) [ 0 ] alternate_identifier { unique_attribute { attribute_path = "UserName" attribute_value = "$ { each.key } @$ { local.domain_name [ each.value.type ]} " } } } resource "aws_ssoadmin_account_assignment" "main" { # locals.tf 側 user_account_mapping も参照のこと for_each = tomap ( { for element in local.user_account_mapping : "$ { element.user } @$ { element.aws_account_id } " => element } ) instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] /* locals.tf 内で定義される local.users で admin = true が設定されている場合は Administrator 一択 そうでない場合は同 local.permission で定義されている権限セットを指定 */ permission_set_arn = ( try (each.value.admin, false ) ? aws_ssoadmin_permission_set.main [ "Administrator" ] .arn : aws_ssoadmin_permission_set.main [ local.permissions [ each.value.type ][ each.value.role ]] .arn ) principal_id = data.aws_identitystore_user.main [ each.value.user ] .id principal_type = "USER" target_id = each.value.aws_account_id target_type = "AWS_ACCOUNT" } locals.tf locals { permission_sets = { Administrator = { description = "Allow permissions defined as AdministratorAccess" managed_policy_name = "AdministratorAccess" } Developer = { description = "Allow permissions defined as PowerUserAccess" managed_policy_name = "PowerUserAccess" } ReadOnly = { description = "Allow permissions defined as ReadOnlyAccess" managed_policy_name = "ReadOnlyAccess" } } } locals { # 職責の一覧 roles = { sre = "sre" # SRE swe = "swe" # 開発(フロントエンド / バックエンド) algo = "algo" # Algo qa = "qa" # QA cre = "cre" # CRE pdm = "pdm" # PdM sales = "sales" # セールス cs = "cs" # CS } # 雇用区分の一覧 types = { employee = "employee" # 正社員 partner = "partner" # 業務委託 } /* Entra ID 上で設定されるユーザ名で使用されるドメイン部 もちろん実際に MNTSQ 内で使われるものとは一致しない */ domain_name = { employee = "example.com" partner = "partner.example.com" } } locals { environment = { production = [ /* プロダクトのワークロードが載っている AWS アカウントで本番環境として扱うべきものを列記 AWS アカウント名が期待される */ ] non_production = [ /* プロダクトのワークロードが載っている AWS アカウントで本番環境として扱う必要は無いものを列記 AWS アカウント名が期待される */ ] } aws_accounts = { # AWS Organizations 管理下にある全ての有効な AWS アカウント for account in data.aws_organizations_organization.main.accounts : account.name => account.id if account.status == "ACTIVE" } } /* 雇用区分と職責とによってどの権限セットをアサインするかを決定する 内訳 - permissions:職責と権限セットとの対応 - mappings:職責と AWS アカウントとの対応 - users:ユーザ(Entra ID 管理) / 雇用区分 / 職責の対応 */ locals { permissions = { /* 雇用区分と職責とで異なる権限セットを設定できるようにする: - 雇用区分:local.types を参照 - 職責:local.roles を参照 */ employee = { sre = "Administrator" swe = "Developer" algo = "Developer" qa = "Developer" cre = "Developer" pdm = "Developer" sales = "ReadOnly" cs = "ReadOnly" } partner = { sre = "Developer" swe = "Developer" algo = "Developer" qa = "Developer" } } mappings = { /* 職責によって IAM Identity Center 経由でアクセス可能な AWS アカウントを制御する 職責の定義は local.roles を参照 */ employee = { sre = values (local.aws_accounts) swe = concat ( local.environment.production, local.environment.non_production ) algo = concat ( local.environment.production, local.environment.non_production ) qa = concat ( local.environment.production, local.environment.non_production ) cre = concat ( local.environment.production, local.environment.non_production, ) pdm = concat ( local.environment.production, local.environment.non_production, ) sales = local.environment.production, cs = local.environment.production, } partner = { sre = local.environment.non_production, swe = local.environment.non_production algo = local.environment.non_production qa = local.environment.non_production cre = local.environment.non_production, } } users = { /* **Entra ID で** 管理されるユーザであって AWS を利用する者を宣言する ユーザ名はドメイン名を省く形で指定する ユーザ名をキーとし、値には以下をもつ map を宣言する: - 雇用区分 (type):local.types を参照 - 職責 (role):local.roles を参照 - 操作許可 AWS アカウント内で Administrator 権限が必要な場合は admin を true に設定 */ # # <名>.<姓> でユーザ名が定義されるとする "taro.sre" = { type = local.types.employee, role = local.roles.sre } , "hanako.swe" = { type = local.types.employee, role = local.roles.swe, admin = true } , "jiro.swe" = { type = local.types.partner, role = local.roles.swe } , "saburo.qa" = { type = local.types.employee, role = local.roles.qa, } , "shiro.algo" = { type = local.types.employee, role = local.roles.algo } , "goro.sales" = { type = local.types.employee, role = local.roles.sales } , } } /* IAM Identity Center 上でのユーザ / 権限セット / AWS アカウントとの紐付けには aws_ssoadmin_account_assignment というリソースを使う https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/ssoadmin_account_assignment このリソースはユーザと AWS アカウントを一度にそれぞれ1つしか設定できない 窮余の策として以下値をもつ list を定義し、当該リソースの for_each では "USERNAME@AWS_ACCOUNT_ID" をキーとする map として処理する: - ユーザ名 (user)(例:"hoge.fuga") - 職責 (role)(例:"sre") - 雇用区分 (type):"employee"(正社員)または "partner"(業務委託) - AWS アカウント ID (aws_account_id) 参考:https://developer.hashicorp.com/terraform/language/functions/flatten */ locals { user_account_mapping = flatten ( [ for user_key, user in local.users : [ for aws_account_id in local.mappings [ user.type ][ user.role ] : { user = user_key role = user.role type = user.type admin = try (user.admin, false ) aws_account_id = aws_account_id } ] ] ) } 使用感 Web ブラウザ IAM Identity Center のセットアップが無事に完了すると IAM Identity Center 設定時に指定した AWS アクセスポータル *8 の URL から SSO ログインが行えます。 権限セットや AWS アカウントが割り当ていない状態でもアクセスは可能ですが、 AWS アカウントへはアクセスできません。一方で両者が設定されている場合は SSO ログイン後に AWS アカウントの選択肢が表示され、設定された権限セットでもってのアクセスが可能となります。 実際に AWS アクセスポータルへアクセスした際の状況は以下のようになります。伏字が多い点はご寛容いただければと思います。 権限セット / AWS アカウント割当前 権限セット / AWS アカウント割当後 AWS アクセスポータルから各 AWS アカウントへアクセスした後はいつものマネジメントコンソール上での操作になります。アクセス先を切り替える為には AWS アクセスポータルへの都度アクセスが必要ですが、都度アクセスの面倒を避けたい向きに サードパーティ の拡張がいくつか提供されています。本稿筆者は AWS SSO Extender を使用しています *9 。 CLI 新しめの AWS CLI を使っている限りにおいて Configuring IAM Identity Center authentication with the AWS CLI に従い設定を済ませれば、以後は aws sso login --profile <プロファイル名> を都度実行することで認証が済み、 aws コマンドでの操作が可能です。これはクレデンシャル管理が必要であった IAM ユーザ利用に比べ大きなメリットと言えるでしょう。 AWS アカウントの切り替えも --profile オプションの指定ないしは 環境変数 AWS_PROFILE の宣言で済みます。 AWS CLI の設定内容については aws configure sso を使い都度設定してゆく方法もありますが、弊社では各職責毎にテンプレート化した設定ファイルを用意し、各作業者が利用している PC に設定を保存してもらう方針をとりました。 ほか Terraform に関しても AWS CLI 向けの設定が済んでいれば aws sso login が完了している前提にて特段支障なく操作が可能です。ただし Terraform 1.6.0 以前では AWS CLI 用の設定に改変を加える必要があり *10 、弊社でも当初この事象を踏みました。最終的には Terraform バージョンを最新版(作業時点で 1.11.4)にしてしまうことで対処としました。 おわりに 複数の AWS アカウントを管理する構成において IAM ロールをスイッチする運用を改め、IAM Identity Center による一元的かつ柔軟な AWS アカウント横断のアクセスを可能とするための作業とその後の所感について扱いました。 現在弊社では IAM Identity Center による AWS への SSO ログインの利用浸透を全社で行っており、これが完了した暁には個々人向けに払い出していた IAM ユーザおよびスイッチロール用の IAM ロールの削除をし、IAM Identity Center 経由での AWS アクセスに完全に移行するよう計画しています。 ユーザから見た場合に各 AWS アカウントへのアクセスが透過的になるのも IAM Identity Center 導入での嬉しさのひとつですが、IAM Identity Center 自体が IdP としての利用も可能であるという点から、SRE 主体での他のサービスへの SSO 化の試みについても着手し易くなってきました。今後は社内で独自のユーザ管理体系が存在する諸々のサービスを着実に SSO 化し、利用者の利便性向上やセキュアな体制構築に繋げてゆこうと計画しています。 IAM Identity Center と Entra ID との組み合わせ自体は然程珍しいものではなく、Web 上にも同様の事例が多数みられます。本稿がそれらのひとつとして数えられ、ひとつの事例としてお役に立てれば幸いです。 MNTSQ 株式会社 SRE 秋本 *1 : Organizing Your AWS Environment Using Multiple Accounts *2 : Controlling access to and for IAM users and roles using tags に詳細があります。弊社では IAM ユーザに所定のタグを設定し、スイッチ先 IAM ロールでは trust policy 内でタグを評価し、タグのキーと値とが上限に合致する場合にスイッチを認可する、といった設定で運用している *3 : Manage AWS accounts with permission sets *4 : 自動プロビジョニング。 Provisioning an external identity provider into IAM Identity Center using SCIM が詳しい *5 : ここ が詳しい。IAM Identity Center にはその設定や効力範囲を制御するための インスタンス という概念があり、 AWS Organizations 管理者アカウントにひもづく インスタンス を組織 インスタンス 、同メンバーアカウントにひもづく インスタンス をアカウント インスタンス という。組織 インスタンス で IdP 連携や SSO 時の設定等を管理する *6 : Manage AWS accounts with permission sets *7 : 本文中で後述している "identity-aware" の語が本記事にかかる設定作業を実際に行っていた2025年4月時点では使われていたが、2025年5月時点では "identity-enhanced" に変わっていた *8 : Using the AWS access portal *9 : 外部ツールに頼らなくしたいという動機はどこにいったのだというツッコミはその通りです…… *10 : https://sadayoshi-tada.hatenablog.com/entry/2023/10/06/001405 が詳しいです。事例公開多謝です
アバター
はじめに Redis ログについて slow ログ engine ログ 設定方針 構成 Datadog Logs 向けに slow ログを修正する インフラ構成 Datadog Logs の風景 おわりに 参考 はじめに 弊社では Web アプリケーションや非同期処理用ジョブの一時データなどを取り扱う際に ElastiCache Redis をインメモリデータベースとして利用しています。 これまで ElastiCache Redis 自体の稼動状況の把握には CloudWatch メトリクスを中心とした数値ベースの情報のみを利用していましたが、今回ここにログ情報を加えることにしました。ログ取得に際し必要となった作業について扱います。 簡単のため、以降は特に断わりなく "ElastiCache Redis" を単に "Redis" と呼びます。 Redis ログについて Redis ログには slow / engine の2種類があります。ElastiCache Redis の場合、ログのフォーマットは JSON およびプレーンテキストの2種類を選べますが、以下では JSON ログを前提とします。 なお slow ログは Datadog Logs へのログ送信に際し注意が必要 です(後述)。 slow ログ 「遅い処理」を記録するものです。詳細は Redis 公式のドキュメント を参照ください。 何をもって遅い処理とするかは ElastiCache Redis の場合、パラメタグループ内 slowlog-log-slower-than にてマイクロ秒単位で決定されます。また slow ログの収集は ElastiCache Redis 6.0 以上のバージョンから可能です。 ログ例は ElastiCache 公式ドキュメント で示される通り、以下のようになります。S3 へ保存されるログもこの形式になります。記録対象のログ1件に対し JSON オブジェクトが1件対応します。 { " CacheClusterId ": " logslowxxxxmsxj ", " CacheNodeId ": " 0001 ", " Id ": 296 , " Timestamp ": 1605631822 , " Duration (us) ": 0 , " Command ": " GET ... (1 more arguments) ", " ClientAddress ": " 192.168.12.104:55452 ", " ClientName ": " logslowxxxxmsxj## " } 各要素は以下のような意味合いになります。前掲ドキュメントの内容を意訳し、適宜補足を加えています。 CacheClusterId:Redis クラスタ 名 CacheNodeId:Redis ノード名(上記 クラスタ 内にあるはず) Id:各ログ固有の識別子 Timestamp: 当該ログが Redis 内で記録された際の 日時 Datadog でログが処理された時刻とは異なるので注意 Duration (us):ログで記録された処理に要した処理時間(マイクロ秒) Command:Redis で処理されたコマンドの内容 ClientAddress:Redis にコマンドを発行したクライアントの IP 通常これは ECS タスクなどが持つ VPC 内 IP になるはず ClientName:Redis にコマンドを発行したクライアントの名前 engine ログ Redis そのものの稼動状況を示すログになります。engine ログの収集は ElastiCache Redis 6.2 以上のバージョンから可能です。これも ElastiCache 公式ドキュメント から例を引用します。S3 側に保存される状況は slow ログと同様です。 例: { " CacheClusterId ": " xxxxxxxxxzy-engine-log-test ", " CacheNodeId ": " 0001 ", " LogLevel ": " VERBOSE ", " Role ": " M ", " Time ": " 12 Nov 2020 01:28:57.994 UTC ", " Message ": " Replica is waiting for next BGSAVE before synchronizing with the primary. Check back later " } 内容については以下となります。 CacheClusterId:Redis クラスタ 名 CacheNodeId:Redis ノード名(上記 クラスタ 内にあるはず) LogLevel:ログの重要性の度合い VERBOSE:雑多なやつ NOTICE:見ておくほうがよいやつ WARNING:真剣に見ておくほうがよいやつ Time: 当該ログが Redis 内で記録された際の 日時 Datadog でログが処理された時刻とは異なるので注意 Role:ログを出力した主体 M:プライマリノード S:レプリカ( セカンダリ )ノード C:ディスクにデータが書き出されたとき より詳細には RDB / AOF 処理が発生したとき( Redis 公式ドキュメントの "Redis persistence" が詳しい) X:フェイルオーバが発生したとき より詳細には Redis Sentinel が動いたとき( Redis 公式ドキュメントの "High availability with Redis Sentinel " が詳しい) 設定方針 既に稼動中の Redis クラスタ ( レプリケーション グループ)はそのまま利用し、ログ取得のための設定を追加で入れます。 ログの出力先については S3 バケット 及び Datadog Logs を選定しました。これは以下のような事情によります。 Datadog Logs:弊社ではログ管理(集約 / 検索 / 監視 etc.)を Datadog で一元化しており、Redis ログも Datadog 上で取り扱えると都合がよい S3:基本的には Datadog 上でログを取り扱うが、Datadog に万一の事態が生じた場合に Athena 等でログを扱えるようにしておきたい いっぽうで ElastiCache ログ取得に関する AWS のドキュメント によればログ出力先には Data Firehose と CloudWatch Logs の2つの選択肢があります。どちらが適切かという話になりますが、以下のとおり Data Firehose を使っておくのがベターなようです。 Redis ログ出力先 メリット デメリット Data Firehose ● S3 をバックアップ先に指定することで S3 へのログ転送も達成可( ドキュメント ) ● Datadog が公式にData Firehose によるログ転送を解説している( ドキュメント ) ● CloudWatch Logs へはログが飛ばない CloudWatch Logs ● S3 と Datadog Logs に加え CloudWatch Logs でのログ取扱が可能になる(CloudWatch Logs Insights が使える) ● S3 / Datadog Logs 向けに サブスクリプション フィルタと別途ログ送信用リソースの用意が必要になる よって本稿では Redis ログを Data Firehose へ出力し、Data Firehose から S3 および Datadog Logs へログを送信するような構成をとることとしました。 構成 構成図は以下の通りです。slow / engine 各ログは最終目的地が同一の S3 バケット / Datadog Logs であることから、Data Firehose ストリームは特に分けない構成としました。S3 バケット は Redis ログ用に新設したものを使います。 構成図 いきなり Data Firehose に Lambda 関数がくっつく構成となっていますが、これには事情があります。 Datadog Logs 向けに slow ログを修正する 当初、上記構成のうち Lambda 関数を設けず(つまり各ログを無加工の状態で) Redis ログを送信した際、slow ログのみ Datadog Logs でログとして取り扱えない状況となりました。Data Firehose から Datadog へのログ送信は問題無い様子だったので、Datadog がログを正しく処理できていない模様です。 トラブルシュートに難儀したのですが、ヒントは Datadog ドキュメントのうち Log Management -> Log Configuration -> Pipelines にありました。関連する箇所を同ドキュメントから要約します: JSON ログ中に Timestamp という attribute がある場合、Datadog Logs はその情報をログの時刻情報として扱う 取り扱える日付のフォーマットは ISO8601 / ミリ秒精度の UNIX 時間 / RFC3164 JSON ログ中に存在する Message の内容をログメッセージ本体として扱う ここで各ログの内容を見てみましょう。前掲のログ例を引用します。 slow ログ { "CacheClusterId": "logslowxxxxmsxj", "CacheNodeId": "0001", "Id": 296, "Timestamp": 1605631822, "Duration (us)": 0, "Command": "GET ... (1 more arguments)", "ClientAddress": "192.168.12.104:55452", "ClientName": "logslowxxxxmsxj##" } engine ログ { "CacheClusterId": "xxxxxxxxxzy-engine-log-test", "CacheNodeId": "0001", "LogLevel": "VERBOSE", "Role": "M", "Time": "12 Nov 2020 01:28:57.994 UTC", "Message": "Replica is waiting for next BGSAVE before synchronizing with the primary. Check back later" } 2ログを見比べると以下がわかります。 slow ログ Timestamp に UNIX 時間( ミリ秒精度ではない )と解釈できる値が入っている Message が無い Command に Redis へ発行されたコマンドが記録されている engine ログ Timestamp は無い Message がある engine ログが Datadog Logs で処理できているので、slow ログを engine ログと同様の内容にするのが手っ取り早いでしょう。つまり以下のようにすると光明が見えそうです。 { " CacheClusterId ": " logslowxxxxmsxj ", " CacheNodeId ": " 0001 ", " Id ": 296 , " Time ": 1605631822 , # 元 ` Timestamp ` " Duration (us) ": 0 , " Message ": " GET ... (1 more arguments) ", # 元 ` Command ` " ClientAddress ": " 192.168.12.104:55452 ", " ClientName ": " logslowxxxxmsxj## " } Data Firehose はストリーム内を流れるデータを Lambda 関数を使い処理できる機能があります。弊社では以下のような TypeScript コードを Node.js ランタイムを使う Lambda 関数として整備し、上記変換処理を行わせることにしました。 コードはこちらを参照 import { Buffer } from 'buffer' ; import { DateTime } from 'luxon' ; console . log ( 'Loading function' ); interface InputRecord { recordId : string ; data : string ; } interface Event { records : InputRecord [] ; } interface OutputRecord { recordId : string ; result : string ; data : string ; } interface Output { records : OutputRecord [] ; } export const handler = async ( event : Event ): Promise < Output > => { const output = event.records. map (( record , index ): OutputRecord => { const decoded = Buffer .from(record.data, 'base64' ). toString ( 'utf-8' ); console .log( `Decoded data: ${ decoded } ` ); /* * event として Data Firehose から受け取ったペイロードが JSON ログでない場合は後続処理をさせない * ElastiCache ログは Data Firehose へ JSON の形式で送るよう設定しているので通常はここに落ちないはず */ let parsed: any ; try { parsed = JSON . parse (decoded); } catch (error) { console .error( `Error parsing JSON for record ${ record.recordId } :` , error); return { recordId : record.recordId, result : 'Ok' , data : record.data, } ; } /* * Timestamp というエントリは Datadog Logs が処理する上で不適。以下のように直す * (参考:https://docs.datadoghq.com/logs/log_configuration/pipelines/?tab=date) * * - Time というエントリに改名 * - 元々 Timestamp として記録されていた epoch 秒を engine ログと同じような日付フォーマットの文字列に変換 * - engine ログと互換性のある内容にしたい */ if ( 'Timestamp' in parsed) { const dt = DateTime.fromSeconds(parsed.Timestamp, { zone : 'utc' } ); parsed.Time = dt.toFormat( "dd MMM yyyy HH:mm:ss.SSS 'UTC'" ); delete parsed.Timestamp; } /* * Message というエントリが無いと Datadog Logs 上でログとして受け付けてくれない * 拾いたい情報である "Command" の内容を Message に改名する * これも engine ログと同様の措置 * (参考:https://docs.datadoghq.com/logs/log_configuration/pipelines/?tab=message) */ if ( 'Command' in parsed) { parsed.Message = parsed.Command; delete parsed.Command; } console .log( `Transformed data: ${ JSON . stringify (parsed) } ` ); const encoded = Buffer .from( JSON . stringify (parsed)). toString ( 'base64' ); return { recordId : record.recordId, result : 'Ok' , data : encoded, } ; } ); console .log( `Processing completed. Successful records: ${ output. length} ` ); return { records : output } ; } ; インフラ構成 Terraform コード例を示します。実際に利用しているコードから要所を抜粋 / 省略したものとなります。 コードはこちらを参照 # Data Firehose 内でのログ処理に使用する Lambda 関数のソースコード # Terraform とは別の経路で事前に S3 バケットへアップロードしておく data "aws_s3_object" "lambda_source_archive" { bucket = var.bucket_name key = "transform-elasticache-logs/artifact.zip" } # Lambda 関数にアタッチされる IAM ロールで使用する IAM ポリシ # 当該関数内ではシンプルに JSON データをいじるのみで AWS サービスを触らない # 最低限の権限(CloudWatch Logs へのログ書き出し)をもつ IAM ポリシだけ参照できるようにする data "aws_iam_policy" "lambda_basic_role" { name = "AWSLambdaBasicExecutionRole" } # Lambda 関数本体 module "lambda_transform_elasticache_logs" { source = "terraform-aws-modules/lambda/aws" version = "7.20.2" function_name = "$ { var.env } -transform-elasticache-logs" description = "Lambda function for transforming slow log from Elasticache to appropriate format suitable for Datadog Logs" handler = "main.handler" runtime = "nodejs22.x" timeout = 60 publish = true attach_policy = true create_package = false policy = data.aws_iam_policy.lambda_basic_role.arn s3_existing_package = { bucket = var.bucket_name key = "transform-elasticache-logs/artifact.zip" version_id = data.aws_s3_object.lambda_source_archive.version_id } assume_role_policy_statements = { account_root = { effect = "Allow" actions = [ "sts:AssumeRole" ] principals = { account_principal = { type = "AWS" identifiers = [ "arn:aws:iam::$ { data.aws_caller_identity.self.account_id } :root" ] } } } } tags = { Name = "$ { var.env } -transform-elasticache-logs" } } # Data Firehose 向け principal を assume する data "aws_iam_policy_document" "elasticache_log_assuming" { statement { actions = [ "sts:AssumeRole" ] effect = "Allow" principals { type = "Service" identifiers = [ "firehose.amazonaws.com" ] } } } # Data Firehose が所定の S3 バケットにログを流し込めるようにする data "aws_iam_policy_document" "elasticache_log_permission" { statement { effect = "Allow" actions = [ "s3:AbortMultipartUpload" , "s3:GetBucketLocation" , "s3:GetObject" , "s3:ListBucket" , "s3:ListBucketMultipartUploads" , "s3:PutObject" , ] resources = [ aws_s3_bucket.main.arn, "$ { aws_s3_bucket.main.arn } /*" , ] } # ログ送信のためには CloudWatch Logs 関連の権限も必要そう statement { effect = "Allow" actions = [ "logs:CreateLogStream" , "logs:PutLogEvents" , "logs:CreateLogGroup" , ] resources = [ "*" ] } } # ElastiCache ログを Data Firehose 経由で配信する為の IAM ロール resource "aws_iam_role" "main" { name = "$ { var.env } -elasticache-log-role" assume_role_policy = data.aws_iam_policy_document.elasticache_log_assuming.json } # ElastiCache ログを Data Firehose 経由で配信する為の IAM ロールにポリシーをアタッチ resource "aws_iam_role_policy" "main" { name = "$ { var.env } -elasticache-log-policy" role = aws_iam_role.main.id policy = data.aws_iam_policy_document.elasticache_log_permission.json } # ログ処理用 Data Firehose ストリーム resource "aws_kinesis_firehose_delivery_stream" "main" { depends_on = [ aws_s3_bucket.main, ] name = "$ { var.env } -redis-log" # S3 と Datadog Logs とにログを送信する # S3 にログを送るのみであれば extended_s3_configuration が使えるが、その場合 Datadog 向けの設定と共存不可 destination = "http_endpoint" http_endpoint_configuration { url = "https://aws-kinesis-http-intake.logs.datadoghq.com/v1/input" name = "Datadog Log" # 以下 `DATADOG_API_KEY` は適切な値に置き換える access_key = DATADOG_API_KEY role_arn = aws_iam_role.main.arn # 今回は Datadog Logs と同内容のログを S3 にも置きたいという目的につき、全ログを S3 に置く設定とした s3_backup_mode = "AllData" request_configuration { content_encoding = "GZIP" # これら値を設定しておくことで Datadog Logs 上でタグが付与される common_attributes { name = "env" value = var.env } } s3_configuration { role_arn = aws_iam_role.main.arn bucket_arn = aws_s3_bucket.main.arn prefix = "redis/" } # ログ整形用 Lambda 関数定義 processing_configuration { enabled = "true" processors { type = "Lambda" parameters { parameter_name = "LambdaArn" parameter_value = module.lambda_transform_elasticache_logs.lambda_function_arn } } } } tags = { "LogDeliveryEnabled" = "true" } } # Redis クラスタ resource "aws_elasticache_replication_group" "main" { depends_on = [ aws_kinesis_firehose_delivery_stream.elasticache_log, ] replication_group_id = "$ { var.env } -redis" description = "Redis replication group" engine = var.elasticache.engine engine_version = var.elasticache.engine_version node_type = var.elasticache.instance_type port = var.elasticache.port subnet_group_name = var.elasticache.subnet_group_name parameter_group_name = aws_elasticache_parameter_group.main.name num_cache_clusters = var.elasticache.instance_num at_rest_encryption_enabled = true apply_immediately = true security_group_ids = var.elasticache.security_group_ids # slow ログ(後述) log_delivery_configuration { destination = aws_kinesis_firehose_delivery_stream.main.name destination_type = "kinesis-firehose" log_format = "json" log_type = "slow-log" } # engine ログ(後述) log_delivery_configuration { destination = aws_kinesis_firehose_delivery_stream.main.name destination_type = "kinesis-firehose" log_format = "json" log_type = "engine-log" } } # Redis パラメタグループ # ログ取得は Redis 6.x 系で利用出来る機能につき、パラメタグループも Redis 6.x 系を対象にする resource "aws_elasticache_parameter_group" "main" { name = "$ { var.env } -redis6" family = "redis6.x" /* どのくらい処理に時間が掛かれば slow な処理として slow log へ出すかを制御する マイクロ秒単位で設定する。デフォルトは 10000(= 10ミリ秒) */ parameter { name = "slowlog-log-slower-than" value = 10000 } } # ログ保存先 S3 バケット resource "aws_s3_bucket" "main" { bucket = "$ { var.env } -elasticache-log" } 必要になるリソースは以下の通りです。 当然用意が必要なもの S3 バケット Redis クラスタ ( レプリケーション グループ) Data Firehose ストリーム 注意が必要なもの Data Firehose が使用する IAM ロールと権限認可のための IAM ポリシ firehose.amazonaws.com な principal を assume できるよう設定 S3 バケット / オブジェクトを変更含め操作する為の権限を設定 Data Firehose は転送エラーを CloudWatch Logs へ記録するので CloudWatch Logs 向けの権限も設定 Redis パラメタグループ slowlog-log-slower-than というパラメタで slow ログに記録する対象となる処理時間の 閾値 を設定(マイクロ秒単位) ここで指定した時間以内で終わる処理はログに記録されないので注意 Lambda 関数 「Datadog Logs 向けに slow ログを修正する」の節で述べた通り slowlog-log-slower-than パラメタによって slow ログとしての記録がなされるか否かについては見落しがちなので注意が必要です。遅い処理が存在しない場合ログにも記録されません。Redis / Data Firehose / S3 各所の設定は問題ないはずなのに何故ログが出ないのだと悩む羽目にならないよう、適当な 閾値 を設定しておきましょう。 Datadog Logs の風景 実際に Datadog Logs 上で収集されたログをお見せできればと思います(所々伏せ字なのはご容赦ください) slow ログ engine ログ 既に扱った通り、slow / engine ログはログ中に自身が slow / engine ログである旨を示す情報を持っていません。各ログに固有の情報でログの種類を判定するのが手軽です。弊社では以下のように @Role の有無でログ判別を行うような view を Datadog Logs へそれぞれ追加し、運用しています。 slow ログ: service:aws -@Role:* engine ログ: service:aws @Role:* Data Firehose + Lambda によりログ内容の調整が可能な構成になっていることから、必要に応じてログ中へ @type: slow といったような値の盛り込みも検討できる状態ではあります。これは運用を続けてみてからの判断となるでしょう。 おわりに ここまで本稿を読んで頂き有り難うございました。Data Firehose を使用して ElastiCache Redis ログを S3 及び Datadog Logs で取り扱う為の設定について扱いました。 engine ログと slow ログとで Datadog Logs へのログ送信に際し検討すべきことに差が出るのは想定外でしたが、最終的には狙った結果を得ることが出来ました。 ログは取得を始めて直ちに何らか嬉しさが生じるようなものではありませんが、月日が経過し記録と実績が蓄積されゆく中で自ずとその価値や用途が見えてくるものとは成り得るはずです。不確定要素の多い将来への投資としての一助となれば幸いです。 MNTSQ 株式会社 SRE 秋本 参考 Log delivery - Amazon ElastiCache Send AWS Services Logs with the Datadog Amazon Data Firehose Destination Understand data delivery in Amazon Data Firehose - Amazon Data Firehose [アップデート] ElastiCache for RedisのスローログをCloudWatch LogsやKinesis Data Firehoseにパブリッシュできるようになりました! | DevelopersIO
アバター
SREチームマネージャーの藤原です。 LM Studio + Gemma 3 + Cline + VSCode の環境を自由研究的に試用したので、その報告エントリです。 モチベーション プライベートでコードを書く際も最近はClineなどを使ってLLMを使ってコーディングをすることが徐々に増えてきました。 VSCode とClineを組み合わせて外部サービスをつかってコードの変更作業を実施する場合、 何かコードの変更を依頼するたびに、財布の中身から少しずつお金が溢(こぼ)れていく感覚があるでしょう。 1回1回の額は少額とはいえ、多数回繰り返すとなかなかの金額になってきます。 会社では予算の範囲内であれば、利用できますが、個人開発の場合はなかなか躊躇してしまうこともあるでしょう。 また、先日 Google が公開したオープンなローカルLLMのGemma 3も話題になったりしています。 そこで API 課金に怯えることなくLLMを活用したコーディングができないか?ということでGemma3を使ったコーディングにチャレンジしてみました。 やったこと Clineからローカルマシンで動かしているGemma 3のモデルを利用してコードを作成させてみる。 試行に用いた環境 昨年くらいにインフルエンザにかかって熱に浮かされた際に購入したマシンで試してみます。 HP ZBook Fury 16 G9 CPU: Core i9 12950HX メモリ: 64GB GPU : NVIDIA A4500 Laptop GPU 16GB OS: Ubuntu 22.04 あらかじめ、GEMMA 3を動かす上で必要となる NVIDIA の プロプライエタリ ドライバや、CUDAは導入済みです。 また ファイアウォール 等も設定済みです。 環境構築 LM Studioの導入 ローカルLLMを動かすためのツールとしてLM Studioを導入します。 LM Studioの公式ページ にアクセスして、LM Studioをダウンロードします。 lmstudio.ai Linux 向けには、 AppImage 版が用意されており、libfuse2さえインストールされていればシングルバイナリで動作するようになっています。 Linux 向けLM Studioのダウンロード画面 ダウンロードしたAppImageファイルを実行できるようにファイルのプロパティを変更します。 ファイルに実行権限を付与しましょう。 ダウンロードしたファイルのアイコンを右クリックして、ファイルのプロパティを開きます。 アクセス権のタブを開いて、プログラムとして実行可能の チェックボックス にチェックを入れます。 ダウンロードした LM Studioのバイナリファイルのプロパティ これでLM Studioの導入は完了です。 それでは、アイコンをダブルクリックしてLM Studioを立ち上げてみましょう。 LM Studio Gemma 3の取得と設定 LM Studioの画面下部でDeveloperに設定します。 画面下部の設定項目 ウィンドウ左側の虫眼鏡アイコンを選択します。 モデルの検索画面が表示されるので、Gemma 3を検索します。 LM Studioのモデル検索画面 検索結果から Gemma3 4Bを選択してダウンロードします。 元の画面に戻りターミナル風のアイコンをクリックします。 Status: Stopped となっているトグルスイッチを切り替えるとCline等からアクセスするためのサーバーが起動します。 Settings ドロップダウンメニューから追加の設定ができます。 他のマシンからアクセスする際は ローカルネットワークでサービング を有効にします。 LM Studioのサーバー設定 次にサーバー経由で提供するモデルを読み込んでおきます。画面上部の 読み込むモデルを選択 ドロップダウンメニューを選択します。 ダウンロード済みのモデル一覧が表示されるので、Gemma 3 4B Instructを選択します。 読み込むモデルの一覧 モデルの読み込みが完了しましたが、このままではコンテキスト長が不足しているため、Cline経由で利用できません。 右側のモデルの設定画面から Load タブを開いて、コンテキスト長を設定します。 ここでは、50000を設定しました。 モデル利用の設定 設定を変更したので 変更の適用 ボタンをクリックして設定変更を反映します。 Info タブを開くと接続に必要な情報が表示されます。 LM Studioへの接続情報 VSCode (Cline)の設定 VSCode を起動し、 拡張機能 でClineを検索してインストールします。 拡張機能 の マーケットプレイス でClineを検索 VSCode の左側のClineのアイコンをクリックします。 VSCode 上のClineアイコン Clineの画面が開くので画面右上の歯車アイコンをクリックして設定画面を開きます。 Clineの画面抜粋 Clineの設定画面が表示されるので、LM Studioに接続するように設定します。 API Providerに LM Studio を選択します。 Base URLにはLM StudioでGamma 3を設定した際に表示された接続情報を設定します。 ここまで設定すると、利用可能なモデルが表示されるので gemma-3-4b-it を選択します。 Clineの設定画面 ここまで設定したら Done ボタンをクリックして設定完了です。 では試みに動かしてみましょう。 動かしてみる Clineを使って Fizz Buzz の bash スクリプト を書かせてみました。 Fizz Buzzを書かせてみる いくつかのプロンプトや何度かの指示出しを試してみましたが、残念ながら gemma-3-4b-it では正しい Fizz Buzzプログラムを出力するまでは、結構な手数が必要でした。 プロンプトの書き方についてはまだまだ改良の余地はありそうなのでこの辺りを磨いていくことで改善はできるかもしれません。 速度的には、外部のサービスを利用するよりもかなり高速に動作しました。試行錯誤する観点では、かなりストレス少なく利用できると思います。 また、(ハードがすでにあれば)電気代のみなので、その点ではコストを気にすることなく安心して利用できました。 モデルを変えた場合どうなるかを検証する目的で gemma-3-12b-it を使っての出力も試してみました。 デフォルトの設定のままでは十分な トーク ン数(Clineの要求するだけのもの)を設定することができませんでした。 一方で、 gemma-3-12b-it が動作するよう設定を変更した場合は正しく動作し、 Fizz Buzzプログラムを正しく生成することはできましたが、実用的とはいいがたい出力速度でした。 まとめ 今回は試みにGemma 3 + LM Studio + Cline + VS Code で生成AIをつかったコーディングができないかを検証してみました。 最低限は動作するところまでは確認できました。 今後もモデルの改良は続くでしょうし、将来的にはより使いやすいものになることは間違いないので、時間を見つけて新しいモデル活用なども含めて試行錯誤を重ねる価値はありそうです。
アバター
はじめに MinIOについて データ移行の要件 データ移行手順 帯域制御の方法 おわりに はじめに MNTSQでSREチームに所属している中岡です。 昨今ではコンテナ技術を使用してアプリケーションを稼働させることが一般的になっています。 コンテナが稼働する環境であれば、理論上は AWS などの クラウド 上でも、オンプレでも、自分の端末でも同じように動作するはずです。 そのため、開発者が自身の端末内に開発環境を構築し、そこで開発を進めた上で AWS 上の環境にデプロイするというケースも多いかと思います。 その際、アプリケーションで使用するデータ保持のため、 RDB やオブジェクトストレージが必要になります。 RDB であれば、 MySQL や PostgreSQL をコンテナで稼働させれば問題ありません。ではオブジェクトストレージはどうすれば良いでしょうか? AWS ではS3(Simple Storage Service)というオブジェクトストレージのマネージドサービスがあります。 ローカル上で AWS と同じ構成を模して開発するには、同じようにストレージを扱う必要があります。 MinIOについて 弊社では、MinIOというS3互換ストレージの OSS を使用しています。 MinIOは、 GNU Affero General Public License v3.0 に基づいてリリースされた高性能オブジェクト ストレージです。 Amazon S3 と API 互換性があります。 MinIO Object Storage for Container — MinIO Object Storage for Container 実際にMinIOをコンテナ環境で稼働させた印象として、安定しており、非常に使いやすいと感じています。 ※本稿では、MinIOの具体的な設定方法などには触れません データ移行の要件 さて、ここからが本題となります。 弊社では以下のような要件が発生しました ローカル環境やオンプレ環境で使用したデータを、そのまま AWS のS3上に移行したい 移行したデータに破損や漏れがないかをチェックしたい ローカルから AWS 上にデータを移行する時、ネットワークの帯域を使い切ってしまい、他の通信に悪影響を与えてしまう事を避けたい 検討の結果、MinIOのcommand line toolとして提供されている mc が優秀で、上記の問題は全てこのツールで対処できました。 MinIO Client — MinIO Object Storage for Linux なお、弊社にて動作実績のある環境は、CentOS7及びRHEL8.9となります。 データ移行手順 mc(minio client)のインストール 以下の手順でインストールします。 $ wget https://dl.min.io/client/mc/release/linux-amd64/mc -- https://dl.min.io/client/mc/release/linux-amd64/mc dl.min.io (dl.min.io) をDNSに問いあわせています... 138.68.11.125, 178.128.69.202 dl.min.io (dl.min.io)|138.68.11.125|:443 に接続しています... 接続しました。 HTTP による接続要求を送信しました、応答を待っています... 200 OK 長さ: 27496600 (26M) [application/octet-stream] `mc' に保存中 100%[==============================================================================================>] 27,496,600 4.83MB/s 時間 5.9s YYYY-MM-DD HH:MM:SS (4.47 MB/s) - `mc' へ保存完了 [27496600/27496600] $ sudo chmod +x mc $ sudo mv mc /usr/local/bin/mc $ mc --version mc version RELEASE.2024-10-08T09-37-26Z (commit-id=cf128de2cf42e763e7bd30c6df8b749fa94e0c10) Runtime: go1.22.8 linux/amd64 Copyright (c) 2015-2024 MinIO, Inc. License GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html> これでmcがインストールできました。 mcの基本的な使用方法は、データ操作をしたい対象に エイリアス の設定をすることです。 以下は初期状態です。 $ mc alias ls mc: Configuration written to `/home/mntsq/.mc/config.json`. Please update your access credentials. mc: Successfully created `/home/mntsq/.mc/share`. mc: Initialized share uploads `/home/mntsq/.mc/share/uploads.json` file. mc: Initialized share downloads `/home/mntsq/.mc/share/downloads.json` file. gcs URL : https://storage.googleapis.com AccessKey : YOUR-ACCESS-KEY-HERE SecretKey : YOUR-SECRET-KEY-HERE API : S3v2 Path : dns local URL : http://localhost:9000 AccessKey : SecretKey : API : Path : auto play URL : https://play.min.io AccessKey : Q3AM3UQ867SPQQA43P2F SecretKey : zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG API : S3v4 Path : auto s3 URL : https://s3.amazonaws.com AccessKey : YOUR-ACCESS-KEY-HERE SecretKey : YOUR-SECRET-KEY-HERE API : S3v4 Path : dns このサンプルを見て分かる通り、MinIOで管理しているストレージだけでなく、 AWS のS3や Google Cloud Storage(以下GCS)もalias設定の対象にすることができます。その際、AccessKeyおよびSecretKeyを使ってアクセスしますが、aliasコマンドでkeyが表示されてしまうため、移行作業に限定したkeyを発行し、対象のS3Bucketを限定するなど、 AWS 側のアクセスキー設定には十分留意してください。 そして、データ移行が終わったら、速やかにkeyを削除しましょう。 では実際に エイリアス の設定をしていきます。MinIOはローカル上にDockerコンテナで起動している前提です。 また、S3は東京リージョンを使用しているため、URLも東京リージョン指定にしています。 移行元:MinIO(ローカルストレージ) 移行先: AWS のS3 mc mirror — MinIO Object Storage for Linux # エイリアスの登録 $ mc alias set mntsq-test-minio http://127.0.0.1:7100 <MINIO_ROOT_USER> <MINIO_ROOT_PASSWORD> Added `mntsq-test-minio` successfully. $ mc alias set mntsq-aws https://<S3BUCKET NAME>.s3.ap-northeast-1.amazonaws.com/ <ACCESS KEY> <SECRET KEY> Added `mntsq-aws` successfully. # エイリアスが登録されているかを確認 $ mc alias ls mntsq-aws URL : https://<S3BUCKET NAME>.s3.ap-northeast-1.amazonaws.com AccessKey : **************************** SecretKey : **************************** API : s3v4 Path : auto Src : /home/mntsq/.mc/config.json mntsq-test-minio URL : http://127.0.0.1:7100 AccessKey : ******** SecretKey : ******** API : s3v4 Path : auto Src : /home/mntsq/.mc/config.json これでデータ移行の準備ができました。 MinIOからS3にデータ移行をするには、mcの ミラーリング 機能を使います。 この例では、test配下のデータを ミラーリング します。 # ミラーコマンド実行 # 送信元、送信先の順番でエイリアス設定したバケットを指定 $ mc mirror mntsq-minio/test mntsq-aws/test 0 B / ? xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaax ...xxxxxxxx.pdf: 1.02 GiB / 1.02 GiB xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaax 10.31 MiB/s 1m41s $ ミラーリング が完了し、プロンプトが帰ってきたら完了です。 もし何かエラーが発生した場合は、その内容が標準出力されます。 また、途中で ミラーリング を停止した場合は、再度同じコマンドを実行すれば問題ありません。 最後に、データの移行がもれなく完了したかを確認します。 $ mc diff mntsq-minio/test mntsq-aws/test # 差分がない場合は、何も出力されずにプロンプトが返ってくる # diffの実行には時間がかかるため、sshでサーバに接続している場合などは途中でセッションが切れてしまう場合がある # データ量が多い場合は、以下のようにバックグランドで実行することを推奨 $ nohup mc diff mntsq-minio/test mntsq-aws/test > YYYYMMDD_mc_diff_log.txt 2>&1 & 帯域制御の方法 ここまででデータ移行は完了です。 ただし、この手順ではデータ移行でインターネット回線の帯域を占有し、他の通信に影響を及ぼす可能性があります。 mcにはアップロード、ダウンロードの帯域を制限するオプションもあり、これを併用することでその心配もなくなります。 mc mirror — MinIO Object Storage for Linux # ミラーコマンドにアップロードの帯域制限を指定 # この場合、30Mbpsの帯域制限をしてアップロードする $ mc mirror --limit-upload 3.57MiB mntsq-minio/test mntsq-aws/test ローカルから AWS へのアップロードになるため、上記のオプションとなります。方向が逆の場合は"--limit-download"を指定します。 おわりに ローカルでテストしたデータを使いたい、オンプレ環境のデータを クラウド 上に単純移行したいなど、オブジェクトストレージを扱う色々なケースがあるかと思います。そうした時に本稿の内容が参考になれば幸いです。
アバター
はじめに 要件の整理 構成 実装例 Stop Env (GitHub Actions) Control ECS (GitHub Actions) Update ECS Clusters (Lambda) おわりに はじめに 弊社MNTSQでは AWS 上にMNTSQ CLMをはじめとする複数のサービスを展開していますが、サービス運用が軌道に乗るにつれて、社内利用の環境( AWS アカウント)が開発環境、QA環境、ステージング環境と用途によって増えていき、コストの増加が無視できない問題となってきました。そこで、 GitHub Actionsを使用して、ECSサービスを夜間停止する仕組みを導入することにより、コストの削減を行いました。workflow_callを使用することにより、良い感じに再利用性のある仕組みが作れたと思うので、記事にしていきたいと思います。 ※ 以降 GitHub Actionsは単にワークフローと表記しています 要件の整理 弊社では、社内環境の停止を行うにあたって、以下のような要件を満たす必要がありました。 開発環境、QA環境、ステージング環境の3環境を対象に夜間停止を行う必要がある 平日は0時 ~ 8時, 休日は終日対象の環境を停止する 残業や休日稼働をする人がいるかもしれないので、手動での起動・停止ができる必要がある 構成 フローの全体図 この構成のポイントは以下です 上位の コンポーネント から「 AWS 環境」->「管理対象のECS クラスタ 」 -> 「ECS クラスタ に所属するECSサービス」と操作対象をスコープダウンしている フロー全体の基点となる"Start Env", "Stop Env"のワークフローを、スケジュール(自動)とworkflow_dispatch(手動)の両方でキックできる このような構成にすることにより各 コンポーネント を再利用性が高い形で作成することができ、管理しなければならないワークフローの数も少なくすることができました。また、以下のような形に拡張することにより、RDSやEC2などの他のコンピューティングリソースを停止対象に追加することも容易です。 対象のコンピューティングリソースを拡張した例 実装例 Stop Env ( GitHub Actions) フロー全体の基点となるワークフローです。スケジュール停止を行いたい場合は開発環境、QA環境、ステージング環境の3つの AWS 環境に対して操作を行いたいですが、手動で起動を行う場合は特定の AWS 環境のみを対象にしたいです。そのロジックを set-target のjobに内包し、このjobのoutputについて、後続の stop-ecs-clusters のjobをmatrixで起動することによって、ワークフローのトリガの種類による対象環境の差分を吸収しています。"Start Env"と"Stop Env"を分けているのは、 schedule をワークフローに記述する必要があったからです。workflow_dispatchで起動する際は環境を選択するだけなので、シンプルな使い心地になっていると思います。 Start Envはこのコードをコピぺして、操作を反転させればOKです。 # .github/workflows/control_stop_env.yml # 指定した環境を停止する # スケジュールで実行される場合は、development~stagingの環境を停止する # 手動で実行される場合は、inputで指定した環境を停止する name : "Stop Env" permissions : # wf_callを実行するためにはこの権限が必要 id-token : write contents : write on : schedule : - cron : '0 15 * * *' # JST 0:00 に自動実行 workflow_dispatch : inputs : environment : type : choice description : 'Environment to Stop' required : true default : 'development' options : - development - qa - staging jobs : # schedule起動の時は"development", "qa", "staging"を対象にする # workflow_dispatch起動の時は、inputで指定されたAWS環境を対象にする set-target : runs-on : ubuntu-latest outputs : target : ${{ steps.set.outputs.target }} steps : - name : Set Target Environment id : set run : | if [ "$GITHUB_EVENT_NAME" = "schedule" ] ; then target='["development", "qa" , "staging" ] ' else target=' [ "${{ inputs.environment }}" ] ' fi echo "target=$target" >> $GITHUB_OUTPUT # 管理対象のECSクラスタを一括管理するWFをCallする stop-ecs-clusters : name : "Stop ECS Clusters" uses : ./.github/workflows/control_ecs_clusters.yml needs : set-target strategy : # 操作対象のAWS環境ごとに並列で起動する matrix : target : ${{fromJson(needs.set-target.outputs.target)}} with : environment : ${{ matrix.target }} action : STOP # stop-ecs-clustersと並列に、別のリソースを管理するWFをcallするjobを追加して拡張できる # stop-rds-clusters: # name: "Stop RDS Clusters" # uses: ./.github/workflows/control_rds_clusters.yml # needs: set-target # strategy: # matrix: # target: ${{fromJson(needs.set-target.outputs.target)}} # with: # environment: ${{ matrix.target }} # action: STOP Control ECS ( GitHub Actions) 管理対象のECS クラスタ を一括停止するワークフローです。ただし、"管理対象のECS クラスタ "の部分の実態は、運用により大きく異なるかと思います。対象の判定ロジックをワークフロー, Lambdaのどちらに置くのが適切かは、ケースバイケースになるはずです。今回はシンプルに、ワークフロー内に管理対象のECS クラスタ を列挙する形の実装例を置いておきます。 ※ 弊社環境は開発環境内に複数の開発面を持っている都合上、管理すべきECS クラスタ の数が多く、対象判定ロジックをワークフローとLambdaで分割して持つ、もう少し複雑な実装になっております。このサンプルコードは弊社のコードから余分な処理を削ぎ落としたものであり、実際の動作を確認したわけではないので、参考程度にご覧ください。 inputには environment (対象AWS環境)と action を要求します。 action は STOP or START を渡し、後続のLambdaでECSサービスを停止するのか起動するのかを制御する変数です。別のワークフローから呼ばれるので、 workflow_call のブロックも記述しています。 # .github/workflows/control_ecs_clusters.yml # 指定した環境のECSクラスターを停止する name : "Control ECS Clusters" env : ASSUME_ROLE_ARN : "arn:aws:iam::%AWS_ACCOUNT_ID:role/oidc-gha-role" LAMBDA_UPDATE_ECS_CLUSTER_NAME : "update-ecs-cluster" permissions : id-token : write contents : write on : workflow_dispatch : inputs : environment : type : choice description : 'Environment to apply' required : true default : 'development' options : - development - qa - staging action : type : choice description : 'Action to apply' required : true default : 'STOP' options : - START - STOP workflow_call : inputs : environment : type : string required : true action : type : string required : true jobs : # 必要な変数を組み立てる # AWSアカウント毎の差分が.github/configs/aws/variables.ymlというファイルに記述されていることを前提としている setup-env : runs-on : ubuntu-latest outputs : ASSUME_ROLE_ARN : ${{ steps.setup-env.outputs.ASSUME_ROLE_ARN }} steps : - name : Checkout uses : actions/checkout@v4 - name : Setup ENV id : setup-env run : | ENV=${{ inputs.environment }} # 設定ファイルの<env>.AWS_ACCOUNT_IDというフィールドから、アカウントIDを取得している AWS_ACCOUNT_ID=$(yq -r ".[ \" $ENV \" ].AWS_ACCOUNT_ID" .github/configs/aws/variables.yml) ASSUME_ROLE_ARN=$(echo $ASSUME_ROLE_ARN | sed -e "s/%AWS_ACCOUNT_ID/$AWS_ACCOUNT_ID/g" -e "s/%ENV/$ENV/g" ) echo "ASSUME_ROLE_ARN=$ASSUME_ROLE_ARN" >> $GITHUB_OUTPUT # 対象のECSクラスタ毎に、所属するECSサービスに対して一括操作を行うLambdaを呼び出す control-ecs-clusters : needs : setup-env runs-on : ubuntu-latest strategy : matrix : # WF冒頭のenvブロックに記載するとfromJsonで展開できないので、ここに対象ECSクラスタを書く cluster : ${{ fromJson('["service1-cluster","service2-cluster"]') }} steps : - name : Configure AWS credentials uses : aws-actions/configure-aws-credentials@v4 with : aws-region : ap-northeast-1 role-to-assume : ${{ needs.setup-env.outputs.ASSUME_ROLE_ARN }} - name : Invoke Lambda UpdateEcsCluster run : | echo '{ "clusterName": "${{ matrix.cluster }}", "action": "${{ inputs.action }}" }' | jq -c > payload.json aws lambda invoke \ --function-name $LAMBDA_UPDATE_ECS_CLUSTER_NAME \ --payload file://payload.json \ --cli-binary-format raw-in-base64-out \ --invocation-type RequestResponse \ response.json Update ECS Clusters (Lambda) "clusterName"と"action"をinputとし、指定されたECS クラスタ の全サービスのタスク数を更新するLambdaのサンプルコードを置いておきます。こちらはnode.22.xで動作を確認しているものになります。 import { ECSClient , ListServicesCommand , UpdateServiceCommand } from "@aws-sdk/client-ecs" ; const ecsClient = new ECSClient ({ region : "ap-northeast-1" }) ; export const handler = async ( event ) => { try { const clusterName = event . clusterName ; const action = event . action ; const desiredCount = action === "STOP" ? 0 : 1 ; if ( ! clusterName ) { throw new Error ( "clusterName is required" ) ; } if ( action ! == "START" && action ! == "STOP" ) { throw new Error ( "action must be either START or STOP" ) ; } console . log ( `Processing cluster: ${ clusterName } ` ) ; // クラスタ内の全サービスを取得 let nextToken ; let serviceArns = [] ; do { const response = await ecsClient . send ( new ListServicesCommand ({ cluster : clusterName , nextToken })) ; serviceArns = serviceArns . concat ( response . serviceArns ) ; nextToken = response . nextToken ; } while ( nextToken ) ; if ( serviceArns . length === 0 ) { console . log ( `No services found in cluster: ${ clusterName } ` ) ; return { message : "No services found" , clusterName } ; } // 各サービスのタスク数を更新 await Promise . all ( serviceArns . map ( async ( serviceArn ) => { await ecsClient . send ( new UpdateServiceCommand ({ cluster : clusterName , service : serviceArn , desiredCount : desiredCount })) ; console . log ( `Updated service ${ serviceArn } to desiredCount: 0` ) ; })) ; return { message : "Successfully updated services" , clusterName , servicesUpdated : serviceArns . length } ; } catch ( error ) { console . error ( "Error updating services:" , error ) ; return { error : error . message } ; } } ; おわりに GitHub Actions を綺麗に実装するのはなかなか難しいですが、今回はシンプルで再利用性の高い形にできたと思うので紹介させていただきました。特に「操作対象をスコープダウンしながら設計する」という部分は、他のワークフローを作成する際にも役立つ考え方になるはずです。 SREのように内部改善やプラットフォーム維持を担うエンジニアは、直接的に売上を上げる機会が少ないからこそ、「コスト」に敏感である必要があります。ただし、コスト削減はそう単純ではなく、例えばテクニカルサポートや営業など、サービスを扱うすべての人が、将来に渡ってスムーズに業務を進められるかどうかも、見落としてはいけない「コスト」です。 業務全体を見渡してみると、もっと幅広い場面で GitHub Actions を活用できるはずです。そうした「小さな自動化の積み重ね」が、より良い運用環境を作っていくのだと思います。とりわけ、「夜間も環境が動いているのはもったいないよね」といったシンプルな課題は、落ちているチリ紙を拾うような気持ちでサクッと解決したいですね。 MNTSQ株式会社 SRE 西室
アバター
こんにちは!! SREチームマネージャーの藤原です。 2024年6月末から2025年2月頭にかけて、 入門 継続的デリバリー の読書会を実施し、完走したのでその報告エントリです。 www.oreilly.co.jp 勉強会の進め方 基本的な進め方としては、 過去エントリ にて解説した通りの進め方に則る形としました。 tech.mntsq.co.jp つまり、 事前に対象とする章を定める 参加者は対象の章を読む 参加者は気になった部分などを引用しながら所感をなどを Google Docs に記載する 当日はそれぞれ読んだ内容についてDocs記載内容について説明しながらディスカッションする の形で進めました。 勉強会のログサンプル 書籍の内容について 書籍の内容としては、架空のシステムを対象にストーリー仕立てでCI(Continuous Integration; 継続的インテグレーション )やCD(Continuous Delivery; 継続的デリバリー)においてよくある問題とその対処方針をまとめています。 流れとしては、事例を挙げた上で個々の状況においてどのような点に問題があるのかを解説しています。問題を抱えた現状を改善してより良い状況に持っていくにはどう考えるか?どう対処するか?を基本となる考え方を示しつつ、アクションを定めていくような形をとっています。 なぜバージョン管理が重要なのか?といった基本的なことについても当然のこととして切り捨てるのではなく改めて丁寧に確認するような形となっています。また、テストそのものやCI/CDパイプラインから得られるシグナルをどうとらえるか、シグナルに比してノイズが多すぎる状況はどのような問題を開発組織にもたらすか?などさまざまな観点からプロダクトの 開発プロセス におけるデリバリーに関わるプロセス上の課題について議論しています。 さらに、テストやビルドの技術的な問題を解決していく中で、DORAメトリクスなどを使ってデプロイに関連した組織パフォーマンスを測定することなども記述されています。 終盤では、CI/CDのパイプラインを構築運用していく上での考え方が述べられています。問題が発生した際の トラブルシューティング に必要なシグナルや、そもそもCI/CDのパイプラインを構成している スクリプト もコードであり、各種ソフトウェアエンジニアリングにおけるプ ラク ティスが適用できることなどが述べられています。 勉強会を通じての感想 事前に読んで気になった点をピックアップしたり、勉強会の中で気になった点などをコメントしたり、個々人が勉強会の中で発言した内容をコメントと残す中で、最終的に Google Docs 上では、A4で37ページ、コメント数は120-130程度の大作になりました。 個々の回では対象となっている章のどの部分に勉強会参加者が興味があるのか?が引用した部分やコメントから浮き彫りになりました。 書籍の内容としては、(藤原個人からみると)特別な内容はなく、よくある課題とそれらへの対応方針を丁寧に 言語化 してくれています。 議論の端緒として非常に有用な書籍でした。 コードを提示した上でどう直すか?といったことはほぼないので、写経して学ぶといったスタイルの書籍ではありません。 それよりは直面している問題をどう捉え、どう対処するかについての指針が多く述べられています。 このようなことから、 読み進めるに際しては、”書籍中で述べられているこの問題は自分たちの抱えているプロダクトにおいてどの部分に対応するだろう?”といった観点から、書籍で述べられている内容をベースに具体的なアクションを議論することでより得られる学びは多くなる と思いました。
アバター
はじめに DMSを使ってMySQLの移行をする際に気をつけたいこと7選! その1. DMSのログを出力するには決まった名前のIAMロールが必要である その2. CDCを有効にするにはソースDBでバイナリログを出す必要がある その3. GENERATEDカラムは移行対象から除外せよ その4. LOB型のカラムがある場合はターゲットDBでNOT NULL制約を一時解除せよ その5. 完全LOBモードの設定が必要か確認せよ その6. AUTO_INCREMENTは手動で移行する必要がある その7. 移行後の検証の設計は慎重に おわりに はじめに データベース移行というのは非常にセンシティブな作業であり、この使命を背負ってしまったエンジニアの皆様におかれましては、さぞ胃に優しくない日々を送っていることかと存じます。そんな私たちの心強い味方が AWS DMSです。 AWS Database Migration Service (以下DMS) は、 AWS が提供する フルマネージドのデータベース移行サービス であり、オンプレミスや クラウド 環境間のデータ移行を可能にします。 MySQL 、 PostgreSQL 、 Oracle など多様なデータベースをサポートし、移行元と移行先の異なるエンジン間の変換も自動化。フルロード、CDC (変更データキャプチャ) による継続的 レプリケーション も可能で、 最小限のダウンタイムでデータベースをスムーズに移行できる のが大きな魅力です! そんな便利なDMSですが、当然使用する際に気をつけなければいけないことはあります。 本記事では、DMSを使用して60回以上のデータベース移行を行なったMNTSQ SREチームから、「 MySQL の移行をする際に気をつけたいこと7選!」をお届けしたいと思います。 ※ なお、本記事はDMS自体の説明や利用方法の解説記事ではございません DMSを使って MySQL の移行をする際に気をつけたいこと7選! その1. DMSのログを出力するには決まった名前のIAMロールが必要である まずはじめにDMSの移行タスクなどを作成すると思いますが、ここで Terraformの公式サンプルコード を見てみましょう。IAMロールの定義として、このような記述があるかと思います。 resource "aws_iam_role" "dms-cloudwatch-logs-role" { assume_role_policy = data.aws_iam_policy_document.dms_assume_role.json name = "dms-cloudwatch-logs-role" } 他のリソースと 命名規則 を揃えたかったとしても、 このロールのnameは変更してはいけません。 aws_dms_replication_instance のリソースの記述を見るとわかりますが、DMS インスタンス にこのロールをアタッチするわけではないのです。DMS インスタンス は暗黙的に特定の 命名 のロールを使用します。何故なのかは知りません。とにかく、他のリソースのような感覚で名前を変更してしまうと、後々動作確認の際に、エラーになっても原因調査が進まないといったことになります。(一応サンプルコードの コメントアウト に注意書きがありますが......) ちなみに dms-access-for-endpoint , dms-vpc-role も同様の理由でnameを変更してはいけません。 その2. CDCを有効にするにはソースDBでバイナリログを出す必要がある DMSの強力な機能であるCDC (変更データキャプチャ) は、 レプリケーション 開始後にINSERTされたレコードも移行先DBに反映させることができる機能です。これがあるため、稼働中の環境でも無停止で レプリケーション を進め、最小のダウンタイムで新データベースに移行することができます。ただし、CDCはソースDBでバイナリログを有効化していないと利用できません。 Aurora MySQL の場合は、パラメータグループで binlog_format=ROW , binlog_row_image=full に設定しておけばOKです。ただし適用にはソースDBの再起動が必要です。 その3. GENERATEDカラムは移行対象から除外せよ GENERATEDカラムは、他のカラムの値を基に自動計算されるカラムです。便利な機能ですが、DMSはこのカラムにも律儀に値をINSERTしようとしてしまい、そのまま移行を実行するとエラーとなってしまいます。 ソースDBにGENERATEDカラムが存在するときは、 aws_dms_replication_task の table_mappings に、以下のようなルールを記述して、移行対象から除外しましょう。除外しても、ターゲットDBにも適切にGENERATEDカラムの制約が設定されていれば、自動で計算された値が再び入るはずです。カラムが複数ある場合は、除外ルールも複数書きます。 resource "aws_dms_replication_task" "mysql" { replication_task_id = "replication-mysql" migration_type = "full-load-and-cdc" ~~ 省略 ~~ table_mappings = jsonencode(local.table_mappings_mysql) } locals { table_mappings_mysql = { rules = [ # GENERATEDカラムを移行対象から除外するルール { rule-type = "transformation", rule-id = "1", rule-name = "skip_generated_column", rule-target = "column", object-locator = { schema-name = <schema_name>, table-name = <table_name>, column-name = <column_name> # GENERATED制約がついているカラム名 }, rule-action = "remove-column" } ] } } その4. LOB型のカラムがある場合はターゲットDBでNOT NULL制約を一時解除せよ DMSはLOB型のカラムを含む行を移行する際、以下の2つのステップで処理を行います。 LOB列をNULLにしたまま行を作成 LOB列をUPDATEしてデータを挿入 このため、LOB型のカラムにNOT NULL制約がついている場合、1の処理でエラーとなってしまうようです。NOT NULL制約がついたLOB型のカラムを持つデータベースの移行を行う際には、DMSの移行タスクを実行する前にターゲットDB側の対象カラムから制約を解除し、移行後に元に戻しましょう。 MySQL のLOB型には TINYBLOB , BLOB , MEDIUMBLOB , LONGBLOB , TINYTEXT , TEXT , MEDIUMTEXT , LONGTEXT , JSON などがあります。 ※ 参考: DMSのAWS公式ドキュメント その5. 完全LOBモードの設定が必要か確認せよ DMSによるLOB型カラムの移行オプションには、次の2つのモードがあります。 制限付きLOB モード すべての LOB 値をユーザー指定のサイズ制限 (デフォルトは 32 KB) で移行します。サイズを制限を超えるLOBは移行されず、手動で移行する必要があります。 完全LOB モード サイズに関係なくテーブル内のすべての LOB データを移行します。 DMSのデフォルトは"制限付きLOB モード" なので、データベースの完全な移行を行いたい場合は明示的に"完全LOBモード"を設定する必要があります。一応 AWS 的には、まず"制限付きLOB モード"を試し、必要なら"完全LOBモード"に切り替えるという戦略を推奨しているみたいです。その主な理由は、"完全LOBモード"だとパフォーマンスが極端に落ちるためのようです。本来LOBはS3などを使用して管理するのが AWS 的なベストプ ラク ティスであり、そのような方法を検討して欲しいのだと思いますが、既に移行の計画に入ってからの変更は厳しい箇所かなと思います。ですので、 結局ほとんどのケースで"完全LOB モード"を使用することになるのではないかと思っています。 LOBモードの設定は、terraformの場合だと aws_dms_replication_task リソースの replication_task_settings.TargetMetadata.FullLobMode にbooleanで定義されています。"完全LOB モード"を使用する場合には FullLobMode=true に設定しましょう。 AWS コンソールから移行タスクを編集して設定することも可能です。 その6. AUTO_INCREMENTは手動で移行する必要がある DMSがサポートするのはあくまでアプリケーションデータであり、 INFORMATION_SCHEMA や performance_schema などの移行は行えません。プライマリキーなどに AUTO_INCREMENT を使用している場合、DMSでの移行後に値がリセットされ、新たなレコードが挿入できなくなるなどのサービス障害の原因となってしまいます。 これを防ぐにはAUTO_INCREMENTの値を手動で移行する必要があります。以下の記事などを参考にし、弊社では移行用 SQL を作成する スクリプト などを用意して、移行手順に組み込みました。注意点として、AUTO_INCREMANTの値を取得する時は SHOW CREATE TABLE <table_name>; などを使用しましょう。 INFORMATION_SCHEMA へのクエリでは、最新の値が取れないことがありますし、物理削除が行われるテーブルではAUTO_INREMENTと最新レコードのidにはズレが生じます。 blog.tocyuki.com その7. 移行後の検証の設計は慎重に データベース移行後には必ず、移行前後で差分が出ていないかの検証を行うかと思います。当然、弊社でも検証を行なっていましたが、移行作業初期にはやはりトラブルに見舞われることはありました。原因は様々でしたが、 移行後検証の完全性が保証されていれば、全てメンテナンスウインドウ中に検知できた ものであり、サービスのインシデントにつながることな無かったはずのものばかりでした。最終的に弊社では、以下のチェックを行う スクリプト を導入した結果、データベース移行に関するトラブルは起きなくなりました。 全テーブルの SELECT COUNT(*) FROM <table>; の結果を移行前後で突き合わせる スクリプト 全テーブルの SHOW CREATE TABLE <table_name>; の結果を移行前後で突き合わせる スクリプト これにより、 レコード数に差分がないこと 、 テーブル構造に差分がないこと 、 AUTO_INCREMENTの差分がないこと が保証できました。 なお、これは弊社の事例に基づく例であり、いかなるケースにおいても上記の確認が移行の完全性を保証するものではございません! 弊社の場合は事前にCDCで レプリケーション を行い、移行日にはサービスのメンテナンス時間をとってこれらを確認しましたが、無停止での切り替えなどを計画している場合は、上記の項目の確認は難しくなります。また、データベースの設計によっては、確認項目が不足しているケースもあるかもしれません。加えて、制限付きLOBモードを使用していた場合、移行がスキップされたLOBがあったとしても、この方法では気づけません。あくまで「最低限ここは確認したほうが良い」程度にお受け取りください。 いずれにせよ、 移行後の検証は、移行作業そのもの以上に慎重に設計すべきです。 おわりに 弊社MNTSQでは、ここ2年間ほどかけて大規模な アーキテクチャ 変更を行い、その一環として計67回の顧客環境のデータ移行を行いました。最初の1回目の作業を行った時は「え、この作業あと60回以上するの?」と、その "永遠" に絶望したものですが、この度、ついにその作業も完了したので、区切りとして記事を書かせていただきました。 本記事では、移行作業を設計・検証する際に、実際に私がハマった箇所を「気をつけたいこと」という形で紹介させていただきました。「ここだけ気をつけていればあらゆるトラブルを回避できる」というものではありませんが、これからDMSを使用してデータベース移行を行うという方の助けになれば幸いです。 MNTSQ株式会社 SRE 西室
アバター
......のですが、かなり苦戦しました。この記事に辿り着いた人はすでにハマっている、もしくはこれからハマる運命(さだめ)にある人も多いと思うので、そのような人の助けになればと思い、記事にして残しておきます。 結論からお伝えすると、Lambdaを使わずに通知を行うことは可能ですが、設定は少し複雑かなという印象でした。 しかし、一度設定出来てしまえば、同じようなことをしたい時の実装コストをグッと抑えられる、とても良い仕組みだと思います。 構成について この構成のメリット この構成のデメリット AWS Chatbotの認証を行う terraformでデプロイしてみる SNS -> Chatbot -> Slackの部分 DynamoDB Stream -> EventBridgePipes -> EventBridgeCustomBusの部分 EventBridgeRule -> SNS の部分 おわりに 構成について Lambdaではなく、 AWS Chatbotを使用してSlackへの通知を行う構成です。 この構成のメリット Lambdaのランタイムやコードの管理から解放される 「 SNS -> Chatbot -> Slack」,「EventBridgeRule -> SNS 」 の部分の汎用性が高く、使いまわせる この構成のデメリット メッセージのカスタマイズ性に少しだけ欠ける 初めて設定する際はハマりどころが多い AWS Chatbotの認証を行う まずは、 AWS ChatbotからSlack ワークスペース に対しての認証を行います。 あらかじめSlack ワークスペース にChatbotのアプリをインストールしておく必要があります。 mntsq.slack.com ワークスペース にアプリをインストールしたら、Chatbotが通知を行いたいチャンネルにアプリを招待します。チャンネル詳細の「インテグレーション」タブから、「 AWS Chatbot」のアプリを招待します。 次に、 AWS コンソールからChatbotのページへ行き、Slackクライアントの設定を行います。 ブラウザでSlackログインしていた場合、Slackの認証ページにリダイレクトされるので、「承認」します。 その後「Slack チャネルを設定」という編集画面に遷移することがありますが、今回はterraformでデプロイを行うので、この画面は閉じてしまって大丈夫です。 terraformでデプロイしてみる 今回はこのような仕組みを作るものとします。 ファイルアップロード時にウイルススキャンを行い、感染ファイルが見つかった場合、DynamoDBのInfectedScanResultsというテーブルに書き込みを行う InfectedScanResultsに書き込みがあった場合、その内容をSlackのエラーチャンネルに通知する SNS -> Chatbot -> Slackの部分 まずは通知の起点となる SNS トピックとChatbotのチャンネル設定を作成します。 SNS # 後段のChatBotからSlackへの通知を行うためのSNSトピック resource "aws_sns_topic" "slack_notify_error" { name = "slack-notify-error" } # SNSにCloudWatchからのPublishを許可するポリシー data "aws_iam_policy_document" "allow_cloudwatch_to_publish_sns" { statement { actions = [ "sns:Publish" , ] effect = "Allow" resources = [ aws_sns_topic.slack_notify_error.arn, ] principals { type = "Service" identifiers = [ "cloudwatch.amazonaws.com" , ] } } } resource "aws_sns_topic_policy" "allow_cloudwatch_to_publish_sns" { arn = aws_sns_topic.slack_notify_error.arn policy = data.aws_iam_policy_document.allow_cloudwatch_to_publish_sns.json } Chatbot # chatbot用のIAMロール data "aws_iam_policy_document" "chatbot_assume_policy" { statement { effect = "Allow" principals { type = "Service" identifiers = [ "chatbot.amazonaws.com" ] } actions = [ "sts:AssumeRole" ] } } data "aws_iam_policy_document" "chatbot_slack_notify" { statement { effect = "Allow" actions = [ "sns:Subscribe" , "sns:ListSubscriptionsByTopic" , "sns:GetTopicAttributes" , "sns:Publish" ] resources = [ aws_sns_topic.slack_notify_error.arn ] } } resource "aws_iam_role" "chatbot_slack_notify_error" { name = "chatbot-slack-notify-error-role" assume_role_policy = data.aws_iam_policy_document.chatbot_assume_policy.json } resource "aws_iam_policy" "chatbot_slack_notify_error" { name = "chatbot-slack-notify-error-policy" description = "IAM policy for Chatbot to notify error to Slack" policy = data.aws_iam_policy_document.chatbot_slack_notify.json } # Chatbot チャンネル設定 resource "aws_chatbot_slack_channel_configuration" "slack_notify_error" { configuration_name = "slack-notify-error" slack_channel_id = "YOUR_CHANNEL_ID" slack_team_id = "YOUR_WORKSPACE_ID" logging_level = "INFO" sns_topic_arns = [ aws_sns_topic.slack_notify_error.arn ] iam_role_arn = aws_iam_role.chatbot_slack_notify_error.arn } terraform apply を実行したら、 AWS コンソールからリソースが作成されたことを確認してください。 Chatbotのコンソールから「テストメッセージの送信」を行い、Slackチャンネルに通知が届けばOKです。 ※弊社開発環境のものなので、微妙にリソース名が異なります なお、2024/12月現在、Chatbotのロググループは強制的に バージニア 北部に作成されるので、東京リージョンを彷徨わないようにご注意下さい。 DynamoDB Stream -> EventBridgePipes -> EventBridgeCustomBusの部分 DynamoDBに書き込みがあった時、それをCustomBusにEventとして送信するEventBridgePipesの設定を行います。 DynamoDB ## InfectedScanResultsテーブル resource "aws_dynamodb_table" "infected_scan_results" { name = "InfectedScanResults" billing_mode = "PAY_PER_REQUEST" hash_key = "ObjectPath" attribute { name = "ObjectPath" type = "S" } # EventBridge -> SNS -> Slack通知を行うためのストリーム stream_enabled = true stream_view_type = "NEW_IMAGE" } EventBridge # カスタムのイベントを記録するためのバス resource "aws_cloudwatch_event_bus" "notification" { name = "notification" } # PipeのためのIAMロール data "aws_iam_policy_document" "eventbridge_pipe_assume_role_policy" { statement { actions = [ "sts:AssumeRole" ] effect = "Allow" principals { type = "Service" identifiers = [ "pipes.amazonaws.com" ] } } } resource "aws_iam_role" "dynamodb_pipe_role" { name = "dynamodb-pipe-role" assume_role_policy = data.aws_iam_policy_document.eventbridge_pipe_assume_role_policy.json } data "aws_iam_policy_document" "dynamodb_pipe_policy" { statement { actions = [ "dynamodb:DescribeStream" , "dynamodb:GetRecords" , "dynamodb:GetShardIterator" , "dynamodb:ListStreams" , ] effect = "Allow" resources = [ aws_dynamodb_table.infected_scan_results.stream_arn ] } statement { actions = [ "events:PutEvents" ] effect = "Allow" resources = [ "*" ] } statement { actions = [ "logs:CreateLogStream" , "logs:PutLogEvents" ] effect = "Allow" resources = [ "*" ] } } resource "aws_iam_role_policy" "dynamodb_pipe_policy" { name = "dynamodb-pipe-policy" role = aws_iam_role.dynamodb_pipe_role.name policy = data.aws_iam_policy_document.dynamodb_pipe_policy.json } # ロググループ resource "aws_cloudwatch_log_group" "dynamodb_infected_scan_results_write" { name = "/aws/vendedlogs/pipes/dynamodb-infected-scan-results-write" } # InfectedScanResultsのストリームをEventBridgeに通知するPipe resource "aws_pipes_pipe" "dynamodb_infected_scan_results_write" { name = "dynamodb-infected-scan-results-write" role_arn = aws_iam_role.dynamodb_pipe_role.arn source = aws_dynamodb_table.infected_scan_results.stream_arn target = aws_cloudwatch_event_bus.notification.arn log_configuration { include_execution_data = [ "ALL" ] level = "INFO" cloudwatch_logs_log_destination { log_group_arn = aws_cloudwatch_log_group.dynamodb_infected_scan_results_write.arn } } source_parameters { dynamodb_stream_parameters { batch_size = 1 starting_position = "LATEST" } } target_parameters { eventbridge_event_bus_parameters { detail_type = "InfectedScanResultsWrite" source = "custom.dynamodb.infected-scan-results" } } } terraform apply を実行したら、DynamoDBにレコードを追加してみてください。ロググループ /aws/vendedlogs/pipes/dynamodb-infected-scan-results-write に dynamodb-infected-scan-results-write というストリームが作成され、いくつかログが出ているはずです。 # 最後のログがこんな感じだったらOK { "resourceArn": "arn:aws:pipes:ap-northeast-1:******:pipe/dynamodb-infected-scan-results-write", "timestamp": 1734671733500, "executionId": "8e3e7d3c-0c1e-4b47-a6b7-******", "messageType": "ExecutionSucceeded", "logLevel": "INFO" } ちなみにこのイベントはCroudTrailなどには記録されません。(最初はCroudTrailに記録されるものだと勘違いして時間を溶かしました)EventBridgeのコンソールからここら辺確認できるようになるとありがたいなぁ......と、しみじみ思います。 EventBridgeRule -> SNS の部分 最後に、CustomBusにイベントが送信された時に、それを拾って SNS にパブリッシュするRuleを作成します。Chatbotが受け取ることができる json の形式は決まっているので、EventBridgeの入力トランスフォーマを使用し、良い感じに整形します。 EventBridgeRule # busのイベントをSNSに通知するためのルール resource "aws_cloudwatch_event_rule" "dynamodb_infected_scan_results_write" { name = "dynamodb-infected-scan-results-write" description = "Send DynamoDB InfectedScanResults write events to EventBridge" event_bus_name = aws_cloudwatch_event_bus.notification.name event_pattern = jsonencode ( { source = [ "InfectedScanResultsPuts" ] detail-type = [ "custom.dynamodb.infected-scan-results" ] } ) } resource "aws_cloudwatch_event_target" "dynamodb_infected_scan_results_write_target" { rule = aws_cloudwatch_event_rule.dynamodb_infected_scan_results_write.name arn = aws_sns_topic.slack_notify_error.arn event_bus_name = aws_cloudwatch_event_bus.notification.name input_transformer { input_paths = { "ObjectPath" : "$.detail.dynamodb.NewImage.ObjectPath.S" , "ScannedAt" : "$.detail.dynamodb.NewImage.ScannedAt.S" , "Message" : "$.detail.dynamodb.NewImage.Message.S" } # jsonencodeを使用すると<, >などが文字コードに変換されてしまうのでTEMPLATEを使用する input_template = <<TEMPLATE { "version": "1.0", "source": "custom", "content": { "textType": "client-markdown", "title": "⚠️ウイルス感染ファイルが検出されました⚠️", "description": "<!subteam^YOUR_TEAM_ID>\n<ObjectPath>\n<ScannedAt>\n<Message>" } } TEMPLATE } } <!subteam^YOUR_TEAM_ID> の部分はメンションしたいSlackのTeamIDです。TeamIDの確認の仕方はここでは割愛します。Chatbotが解釈できる json の形式は、 こちらのドキュメント を参照しました。 これで全リソースの定義を作成できました。 terraform apply を実行して、もう一度DynamoDBにレコードを追加し、Slackに通知が飛んでくることを確認しましょう。 無事通知されました! おわりに 今回はLambdaを使用せず、DynamoDBへの書き込みをSlackに通知する方法について紹介させていただきました。EventBridge + SNS + Chatbotの構成は設定も簡単で再利用性が高く、監視モニタリングの整備をする際にはとても便利な仕組みですね。 本当はSlackのメッセージにカラーバーをつけたりとカスタマイズしたかったのですが、 AWS サポートに問い合わせたところ、2024/12現在ではそこまで細かい設定はできないようです。ただしEventBridgeRuleのターゲットにはHTTPエンドポイントも指定できるようなので、もっとこだわりたい人はこちらの方法を使ってみるのも良いかもしれません。 dev.classmethod.jp 以上、何かの助けになれば幸いです! MNTSQ株式会社 SRE 西室
アバター
こんにちは。MNTSQ( モンテスキュー )株式会社でQAエンジニアをしている坂本です。 今回は ソフトウェアテスト Advent Calendar 2024 の場をお借りして、 弊社の 自動テスト構築過程 をご紹介します。 QAメンバー4名だけでなく、PdMやSREにもご協力頂きながら進めており、 2025年1月からの稼働に向けた準備が大詰めの段階です。 品質と開発スピードの両立を目指したコラボレーションの様子が少しでも伝わればうれしいです。 テスト自動化の目的 MNTSQ社では アジャイル 開発を行っており、製品のリリースサイクルごとに新規開発機能のテストと、既存機能全体の リグレッション テストを実施しています。 リグレッション テストはテスト項目数が多いため、すこし早めにテスト実施担当者とスケジュール調整する必要があります。 そこで、テストスケジュールの自由度を高めることを目的として、 リグレッション テストの約9割を自動化することにしました。 テスト自動化ツールAutify 今回は、 Autify(オーティファイ) という自動テスト作成ツールを用いています。 主な機能は以下の通りです。 ノーコードのテストシナリオ作成 テストの定期実行 テストレポートの自動作成 テストの作成・管理・自動作成されたテストレポートの保管がWeb上で完結するため、URL一つで情報共有できるところがありがたいです。 テスト設計 手動 リグレッション テストをそのままAutifyシナリオにすることは、以下の理由で避けました。 製品の成長とともにテスト項目が膨大になっていた 徐々に継ぎ足されてきたため、テストの体系が見えにくくなってきていた その代わり、製品の性質を踏まえ、以下のテストを作成することにしました。 クリティカルパス の動作確認テスト CRUD テスト 権限テスト リグレッション テスト全体の1割についてはAutifyで実装しづらい動作であったため、手動テストとして継続することにしました。 テスト設計のレビュー テスト設計をQA内でピアレビューした後、特に重要な権限ごとの期待動作について各製品のPdMにもレビューを依頼しました。 レビューの過程で細かい仕様の認識違いも明らかになりましたので、色々な方にレビューをして頂けて大変助かりました。 テスト用データの準備 SREへ相談し、テスト用環境を特定のテスト用データで初期化できる仕組みを作って頂きました。 この仕組みはテスト実行の安定化に対して強力です。SREチームに大変感謝しています。 自動テストが途中で落ちた際にテスト過程で作成したデータが残ることがあるのですが、この仕組みがあれば毎回同じ条件でテストをスタートできます。 アカウント テストしたい権限を持つユーザーアカウントを作成します。 ファイル 処理結果をテストで確認したいファイルを用意します。 テスト用環境の設定値の調整 製品の実環境と同じ条件で作成されたテスト用環境に、一般的な利用場面を想定したユーザー設定値を追加していきます。 Autifyテストシナリオの実装 いよいよ、Autifyのシナリオを実装します。 Autifyレコーダーを起動させた状態で製品のUIを操作すると、その様子が記録され、Autifyの中にあるAIがステップに切り分け、テストシナリオ起こしてくれます。 このおかげで、テスト実装はコードを書かずに進めることができました。 頻度として多くはないのですが、HTMLと CSS の構成が特に複雑な画面では、 クリックしたい要素の特定方法をAutifyの中のAIがうまく割り出せないことがあります。 その場合は、人間が CSSセレクター や XPath で特定し直します。 大抵はブラウザの検証ツールからコピー&ペーストした CSS セレクタ で動くのですが、 もし画面の構成が複雑過ぎて、 CSS セレクタ ーをいい感じに簡略化できないときには、 Autifyのサポート窓口に相談すると、専門の方に手厚くフォローして頂けます。 リグレッション テストの9割を自動化する目標の元、 やることが多くて社内では細かい分析まで手が回らないことがありますので、 サポート頂けて本当に助かっています。 ひととおり実装を終えた感想 自動 リグレッション テストを設計する際に一番重視したのは、いかに安全にパターンを絞るかでした。 そのためには自社が提供するウェブアプリの特徴を捉え、必ず押さえなければいけないポイントを見極めることが重要でした。 このポイントの整理に一番力を注ぎました。 ポイントを整理してからテスト項目の骨格を組み立てることで、既存の手動 リグレッション テスト項目との比較もしやすくなり、 本当に移行して大丈夫か、既存のテスト項目から漏れているものはないか、追加すべきものはないかを検討する際の道筋を得られたと思います。 自動化したい内容の全体像を予め設計できたことで、目標の「9割」を実装したと表現しやすくなり、 チーム外とのコミュニケーションも取りやすくなりました。 実装後、自動テストの内容を、QAチームで日頃仕様している手動テスト項目書のフォーマットに書き起こし、 ドキュメントとして利用できるようにしています。ドキュメントを作成するとテストシナリオの粗が見えてくるので、 リファクタリング も同時に行っているのですが、最初にポイントを整理していたために迷わず作業できているように思います。 運用体制構築に向けて 来月から自動テストの運用フェーズに入ります。 ここまでは効率重視で、テスト自動化のための役割を分担して進めてきました。 プロジェクト管理 データ整備 テスト設計&実装 QAチームは比較的新しい組織で、4名のQA歴は長い方から1年3ヶ月、1年1ヶ月、5ヶ月、2ヶ月ですから、 よく協力して頑張っているのではないかと思います。 ここからは、QAチームの誰もが自動テストのメンテナンスが出来るようになろう! という目標に変わりますので、以下のような活動が始まります。 Autify勉強会 リグレッション テスト追加・削除基準のすり合わせ テストレポート作成方針のすり合わせ 年明けからの運用フェーズ立ち上げも頑張ろうという意気込みで、年末の振り返りとして自動テスト構築のまとめ記事を発信させていただきました。 ここまでお読み頂きありがとうございました!
アバター
こんにちは、MNTSQ でバックエンドエンジニアをやっております河久保です。 先日 Kaigi on Rails 2024 に参加してきたので、参加記をしたためます。 Kaigi on Rails は昨年に続き2回目の参加となります。 MNTSQ もアプリケーションのバックエンドは Ruby on Rails で実装されており、昨年に続き今年もスポンサーとして手を挙げさせていただきました。 個人としてプロポーザルを出したことや、担当しているシステムのより深いところまで触れる機会が増えたことで、昨年より主体的にカンファレンスに関われたと実感しています。 スポンサーボード 今回は以下の講演を聴講してきました kaigionrails.org 1日目 Hall Red Hall Blue 1 基調講演 2 Rails の仕組みを理解してモデルを上手に育てる - モデルを見つける、モデルを分割する良いタイミング - 3 そのカラム追加、ちょっと待って!カラム追加で増える ActiveRecord のメモリサイズ、イメージできますか? 4 モノリス でも使える!OpenTelemetryで Rails アプリのパフォーマンス分析を始めてみよう 5 cXML という 電子商取引 の トランザクション を支える プロトコル と向きあっている話 6 リリース8年目のサービスの1800個のERBファイルをViewComponentに移行した方法とその結果 7 ActionCableなら簡単? 生成 AIの応答をタイピングアニメーションで表示。実装、コスト削減、テスト、運用まで。 8 現実の Ruby / Rails アップグレード 9 (中抜け) (中抜け) 2日目 Hall Red Hall Blue 1 都市伝説バスターズ「Webアプリの ボトルネック はDBだから言語の性能は関係ない」 2 Cache to Your Advantage: フラグメントキャッシュの基本と応用 3 OmniAuthから学ぶOAuth 2.0 4 約9000個の自動テストの時間を50分から10分に短縮、 偽陽性 率(Flakyテスト)を1%以下に抑えるまでの道のり 5 Sidekiq vs Solid Queue 6 The One Person Framework 実践編 7 Data Migration on Rails 8 30万人が利用するチャットをFirebase Realtime DatabaseからActionCableへ移行する方法 9 サイロ化した金融システムを、packwerk を利用して無事故で リファクタリング した話 10 Identifying User Identity 11 基調講演 2日間通して振り返ると、それぞれの講演に連動があり Kaigi on Rails オーガナイザーからのメッセージを強く感じました。 One Person Framework Rails 7 から One Person Framework という旗を掲げて、 Rails 8 も当然その流れをくんだ機能・ コンポーネント を提供しています。 0(Idea) →→→→ 1( IPO ) というコンセプトは、 初期は資金もないし、人もいないから Rails Way に乗って最速で立ち上げる。 サービスが当たってグロースして IPO というころには資金もあるわけなので、そのときに顕在化した ボトルネック の アーキテクチャ を乗り換えれば使えばいいし、人も雇用できるでしょ。 と私は解釈しており、とても納得感のあるアプローチだなと思います。 『One Person』からは「じゃあ規模大きくなったらダメなの?」という疑問符浮かぶのも必然だと思います。 シンプルさ、 Rails Way の追求 小規模なアプリケーションであれば1人の脳に収まるでしょうが、サービスが成長するに従いそれは難しい問題となるでしょう。 加えて Ruby 言語の特性がゆえに避けられない問題(主に型の側面)もあります。 多人数が触るコードベースは時間とともに統制が取れなくなってくるでしょう。 そういった面のアプローチとして「シンプルに保つ」という話が、2つの基調講演で触れられていたのが印象的でした。 初日の palkan さんによる講演では、新しい層を持ち込むことへのアプローチについてサンプルコードを添えたレクチャーがありました。 ここではプレゼンテーション層へのインタフェースとなる Form オブジェクトを取り上げていたと記憶しています。 このアプローチは将来的なプロダクトコードのおぼろげながら抱いていたイメージに合っていたので、勇気づけられました。 2日目の 島田 さん講演で「オプションを手に入れよう」「シンプルさを維持するための修復」という点が響きました。 『オプション』とは何でも受けられる設計ということを指すのではなく、シンプルさを維持することで将来的に取れる択を残しておきましょうということ。 『修復』とは単に元の状態に戻すことではなく、損傷を直しつつそのときの環境、新しい技術を使ってより環境に適応した状態にすることで、これは創造的なことだよね。 と話されてました。 損傷を直しつつそのときの環境、新しい技術を使ってより環境に適応した状態にすること この部分が私の持つポリシー *1 とフィットして心震えました。 どちらも アーカイブ が公開されたらぜひとも見返したいです。 Solid Queue Rails 8 から ActiveJob のデフォルトバックエンドになるという「Solid Queue」からも初期は極力 ミドルウェア を排除するという強い意志を感じています。 willnet さんの講演を聞くまで DB ベースのキュー管理って大丈夫なんだろうか?という不安がありました。 しかし、事例紹介で 2,000万 job/日は捌けているよ(Sidekiq は 20,000 job/秒 !!)という言及されており、次回 rails new する際に触ってみようと思います。 Open Telemetry ちょうど Open Telemetry に興味を持ち始めたので、 ymtdzzz さんの講演を聞きました。 speakerdeck.com 今回は Rails アプリケーションという領域での紹介でしたが、トレースとアプリケーションログの連動のといったデモを見て、「これこれ、これがやりたいんよ」ってなりました。 弊社では Rails アプリケーションは DataDog の APM を導入しておりますが、 DataDog Log を掛け合わせて同様のこともできそうですが、 Open Telemetery でメトリクスとログをコレクトして、 DataDog なり他サービスなりに放り込んでリッチなビューアーとして利用するというアプローチは現実的だな思いました。 またフロントエンドからトレースを送ることで、ユーザーリク エス トから始まる各レイヤーのトレースが取得でき、サービス全体での ボトルネック の発見に向けて適切なアプローチをとることもできそうだという感覚を得られました。 ほかにもたくさん osyoyu さんの講演からも CPU バウンドなのか、 I/O バウンドなのか、その比率から適切な Puma スレッド数が変わってくるというはなし。 ohbarye さんの講演では、最近私達が db:migration のスキームに乗りつつ、手オペを減らしたのですが、それ以外のアプローチの紹介。 moro さんの講演の User モデルの拡張のアプローチはとても斬新(User テーブルは id だけもつ)で、提示されたコードもエレガント。 and more ... おわりに Kaigi on Rails 2024 を終えて色々とやりたいことが湧いてきて仕方がないです。 こういうワクワク感が出てくると目先のタスクがおざなりになりがちな性格なので、チームメンバーには「掛かり *2 気味になってたら、しっかり手綱を握って制御お願いします」と言ってまわっています💦。 2日目の各社の Drink Up から流れ着いた二次会、三次会、、、でも参加者と講演の話などで盛り上がりました。 懐がデカすぎる! @kawakubox 「ここのテーブルは全部俺が持つッ」 唐突なスケールにも対応できるMNTSQさんありがとうございます! #kaigionrails pic.twitter.com/arWbYF78ov — iberianpig(Kohei Yamada) (@nukumaro22) 2024年10月26日 弊社について「あぁ、もんてすきゅーさんね」と言われるくらいに認知はされ始めてきていると思いますが、プロダクトについてまったく知らないという方も多いと思います。 プロダクトについて知りたい!! Kaigi on Rails のことで話したい!! などありましたら、カジュアル面談でもよいですし、私の X アカウントでもよいので気軽にメッセージを送っていただけたらと思います。 careers.mntsq.co.jp Kaigi on Rails 2024 の クリエイティヴ は CC BY 4.0 のライセンス下で提供されております *1 : カジュアル面談でも飲みの場でもいくらでも話します *2 : 騎手と馬の呼吸が合わず、ちぐはぐな状態
アバター