本記事は、2025 年 11 月 17 日に公開された Introducing Amazon MWAA Serverless を翻訳したものです。翻訳はクラウドサポートエンジニアの山本が担当しました。 本日、AWS は Amazon Managed Workflows for Apache Airflow (MWAA) Serverless の提供を発表しました。これは MWAA の新しいデプロイメントオプションで、 Apache Airflow 環境の運用オーバーヘッドを排除しながら、サーバーレススケーリングによってコスト最適化を実現します。この新しいサービスは、データエンジニアと DevOps チームがワークフローのオーケストレーションで直面する主な課題、つまり運用スケーラビリティ、コスト最適化、アクセス管理を解決します。 MWAA Serverless では、プロビジョニングされた容量を監視するのではなく、ワークフローロジックに集中できます。Airflow ワークフローをスケジュール実行またはオンデマンド実行で送信でき、実際に各タスクが実行されたコンピュート時間分のみ支払います。サービスが自動的にすべてのインフラストラクチャをスケーリングするため、負荷に関係なくワークフローを効率的に実行できます。 運用の簡素化に加えて、MWAA Serverless は AWS Identity and Access Management (IAM) による細粒度のアクセス制御を実現する 新しいセキュリティモデル を導入しました。各ワークフローに独自の IAM 権限を設定でき、お客様の VPC 上で各タスクを実行できるため、個別の Airflow 環境を作成することなく、正確なセキュリティ制御を実装できます。このアプローチにより、セキュリティ管理のオーバーヘッドを大幅に削減しつつ、セキュリティ体制を強化できます。 この記事では、MWAA Serverless を使用してスケーラブルなワークフロー自動化ソリューションを構築およびデプロイする方法を紹介します。ワークフローの作成とデプロイ、 Amazon CloudWatch を使用した監視の設定、既存の Apache Airflow DAGs (Directed Acyclic Graphs) をサーバーレス形式に変換する実践的な例を紹介します。また、サーバーレスワークフローを管理するためのベストプラクティスを探り、監視とログ記録の実装方法を紹介します。 MWAA Serverless の仕組み MWAA Serverless は、ワークフロー定義を処理し、サービス管理の Airflow 環境で効率的に実行し、ワークフローの需要に基づいてリソースを自動的にスケーリングします。MWAA Serverless は、 Amazon Elastic Container Service (Amazon ECS) エグゼキュータを使用して、各個別のタスクを独自の ECS Fargate コンテナ上で実行します。これは、お客様の VPC またはサービス管理の VPC のいずれかで行われます。それらのコンテナは、Airflow 3 Task API を使用して、割り当てられた Airflow クラスターと通信します。 図 1: Amazon MWAA のアーキテクチャ MWAA Serverless は、タスクの分離によってセキュリティを強化するため、人気のあるオープンソース DAG Factory 形式に基づく宣言型の YAML 構成ファイルを使用します。これらのワークフロー定義を作成するには、2 つの選択肢があります。 Amazon Provider Package の AWS 管理オペレーター を使用するワークフローを YAML に直接記述します AWS が提供する python-to-yaml-dag-converter-mwaa-serverless ライブラリ ( PyPi から入手可能) を使用して、既存の Python ベースの DAG を YAML に変換します この宣言的アプローチには 2 つの主な利点があります。まず、MWAA Serverless がワークフロー定義を YAML から読み取るため、ワークフローコードを実行せずにタスクのスケジューリングを決定できます。次に、MWAA Serverless はタスクが実行されるときにのみ実行権限を付与できるため、ワークフロー全体に広範な権限を必要としません。その結果、タスクの権限範囲が正確に限定され、時間制限される、より安全な環境を実現できます。 MWAA Serverless のサービス検討事項 MWAA Serverless には、サーバーレスとプロビジョニングされた MWAA デプロイメントを選択する際に考慮すべき、以下の制限があります: オペレーターサポート MWAA Serverless は、Amazon Provider Package のオペレーターのみをサポートしています。 カスタムコードやスクリプトを実行するには、以下のような AWS サービスを使用する必要があります。 AWS Lambda は Python コードの実行に使用します。 AWS Batch 、 Amazon ECS 、 Amazon EKS は Bash 操作に使用します。 AWS Glue はサードパーティのデータ接続に使用します。 ユーザーインターフェース MWAA Serverless は Airflow UI を使用せずに動作します。 ワークフローの監視と管理には、 Amazon CloudWatch と AWS CloudTrail との統合を提供しています。 MWAA Serverless の利用 MWAA Serverless を使用するには、以下の前提条件とステップを完了してください。 前提条件 始める前に、次の要件が満たされていることを確認してください: アクセスと権限 AWS アカウント バージョン 2.31.38 以降の AWS Command Line Interface (AWS CLI) をインストール済み IAMロールとポリシーを作成・変更するための適切な権限が必要です。これには以下の IAM 権限が含まれます: airflow-serverless:CreateWorkflow airflow-serverless:DeleteWorkflow airflow-serverless:GetTaskInstance airflow-serverless:GetWorkflowRun airflow-serverless:ListTaskInstances airflow-serverless:ListWorkflowRuns airflow-serverless:ListWorkflows airflow-serverless:StartWorkflowRun airflow-serverless:UpdateWorkflow iam:CreateRole iam:DeleteRole iam:DeleteRolePolicy iam:GetRole iam:PutRolePolicy iam:UpdateAssumeRolePolicy logs:CreateLogGroup logs:CreateLogStream logs:PutLogEvents airflow:GetEnvironment airflow:ListEnvironments s3:DeleteObject s3:GetObject s3:ListBucket s3:PutObject s3:Sync インターネット接続可能な Amazon Virtual Private Cloud (VPC) へのアクセス 必要な AWS サービス – MWAA Serverless に加えて、以下の AWS サービスへのアクセス権が必要です: 既存の Airflow 環境にアクセスするための Amazon MWAA ログを表示するための Amazon CloudWatch DAG と YAML ファイル管理のための Amazon S3 権限制御のための AWS IAM 開発環境 Python 3.12 以降がインストール済み ワークフロー定義を保存するための Amazon Simple Storage Service (S3) バケット YAML ファイル編集用のテキストエディタまたは IDE その他の要件 Apache Airflow の概念に関する基本的な知識 YAML 構文の理解 AWS CLI コマンドの知識 注意 : この記事を通して、お客様自身の値に置き換える必要があるサンプルの値を使用しています: amzn-s3-demo-bucket をお客様の S3 バケット名に置き換えてください 111122223333 をお客様の AWS アカウント番号に置き換えてください us-east-2 をお客様が利用する AWS リージョンに置き換えてください。MWAA Serverless は複数の AWS リージョンで利用可能です。現在の提供状況については、 リージョン別の AWS サービス一覧 を確認してください。 サーバーレスワークフローの初期作成 まず、S3 オブジェクトのリストを取得し、そのリストを同じバケットのファイルに書き込む簡単なワークフローを定義しましょう。 simple_s3_test.yaml という新しいファイルを次の内容で作成してください: simples3test: dag_id: simples3test schedule: 0 0 * * * tasks: list_objects: operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator bucket: 'amzn-s3-demo-bucket' prefix: '' retries: 0 create_object_list: operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator data: '{{ ti.xcom_pull(task_ids="list_objects", key="return_value") }}' s3_bucket: 'amzn-s3-demo-bucket' s3_key: 'filelist.txt' dependencies: [list_objects] このワークフローを実行するには、上記のバケットを一覧表示し、書き込む権限を持つ実行ロールを作成する必要があります。このロールは、MWAA Serverless から引き受けられる必要もあります。以下の AWS CLI コマンドで、このロールとそれに関連するポリシーを作成します。 aws iam create-role \ --role-name mwaa-serverless-access-role \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-serverless.amazonaws.com" ] }, "Action": "sts:AssumeRole" }, { "Sid": "AllowAirflowServerlessAssumeRole", "Effect": "Allow", "Principal": { "Service": "airflow-serverless.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "${aws:PrincipalAccount}" }, "ArnLike": { "aws:SourceArn": "arn:aws:*:*:${aws:PrincipalAccount}:workflow/*" } } } ] }' aws iam put-role-policy \ --role-name mwaa-serverless-access-role \ --policy-name mwaa-serverless-policy \ --policy-document '{ "Version": "2012-10-17", "Statement": [ { "Sid": "CloudWatchLogsAccess", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" }, { "Sid": "S3DataAccess", "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket", "arn:aws:s3:::amzn-s3-demo-bucket/*" ] } ] }' その後、YAML DAG を同じ S3 バケットにコピーし、上記の AWS CLI コマンドの実行結果に含まれる ARN に基づいてワークフローを作成します。 aws s3 cp "simple_s3_test.yaml" \ s3://amzn-s3-demo-bucket/yaml/simple_s3_test.yaml aws mwaa-serverless create-workflow \ --name simple_s3_test \ --definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml/simple_s3_test.yaml" }' \ --role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role \ --region us-east-2 最後のワークフローを作成する AWS CLI コマンドの出力は WorkflowARN の値を返し、この値を使用してワークフローを実行します: aws mwaa-serverless start-workflow-run \ --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ --region us-east-2 出力には RunId の値が返され、その値を使用して実行したばかりのワークフローの実行状況を確認できます。 aws mwaa-serverless get-workflow-run \ --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ --run-id ABC123456789def \ --region us-east-2 YAML を変更する必要がある場合は、S3 にコピーし直して update-workflow コマンドを実行できます。 aws s3 cp "simple_s3_test.yaml" \ s3://amzn-s3-demo-bucket/yaml/simple_s3_test.yaml aws mwaa-serverless update-workflow \ --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ --definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml/simple_s3_test.yaml" }' \ --role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role \ --region us-east-2 Python DAGs を YAML 形式へ変換 AWS は、オープンソースの Airflow DAG プロセッサを使用して Python DAG を YAML DAG ファクトリ形式にシリアル化する変換ツールを公開しています。インストールするには、次のコマンドを実行します。 pip3 install python-to-yaml-dag-converter-mwaa-serverless dag-converter convert source_dag.py --output output_yaml_folder 例えば、次の DAG を作成し、 create_s3_objects.py という名前を付けます: from datetime import datetime from airflow import DAG from airflow.models.param import Param from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator default_args = { 'start_date': datetime(2024, 1, 1), 'retries': 0, } dag = DAG( 'create_s3_objects', default_args=default_args, description='Create multiple S3 objects in a loop', schedule=None ) # Set number of files to create LOOP_COUNT = 3 s3_bucket = 'md-workflows-mwaa-bucket' s3_prefix = 'test-files' # Create multiple S3 objects using loop last_task=None for i in range(1, LOOP_COUNT + 1): create_object = S3CreateObjectOperator( task_id=f'create_object_{i}', s3_bucket=s3_bucket, s3_key=f'{s3_prefix}/{i}.txt', data='{{ ds_nodash }}-{{ ts_nodash | lower }}', replace=True, dag=dag ) if last_task: last_task >> create_object last_task = create_object python-to-yaml-dag-converter-mwaa-serverless をインストールしたら、次のコマンドを実行します。 dag-converter convert "/path_to/create_s3_objects.py" --output "/path_to/yaml/" 出力は次のようになります: YAML validation successful, no errors found YAML written to /path_to/yaml/create_s3_objects.yaml 実行結果の YAML は次のようになります。 create_s3_objects: dag_id: create_s3_objects params: {} default_args: start_date: '2024-01-01' retries: 0 schedule: None tasks: create_object_1: operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator aws_conn_id: aws_default data: '{{ ds_nodash }}-{{ ts_nodash | lower }}' encrypt: false outlets: [] params: {} priority_weight: 1 replace: true retries: 0 retry_delay: 300.0 retry_exponential_backoff: false s3_bucket: md-workflows-mwaa-bucket s3_key: test-files/1.txt task_id: create_object_1 trigger_rule: all_success wait_for_downstream: false dependencies: [] create_object_2: operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator aws_conn_id: aws_default data: '{{ ds_nodash }}-{{ ts_nodash | lower }}' encrypt: false outlets: [] params: {} priority_weight: 1 replace: true retries: 0 retry_delay: 300.0 retry_exponential_backoff: false s3_bucket: md-workflows-mwaa-bucket s3_key: test-files/2.txt task_id: create_object_2 trigger_rule: all_success wait_for_downstream: false dependencies: [create_object_1] create_object_3: operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator aws_conn_id: aws_default data: '{{ ds_nodash }}-{{ ts_nodash | lower }}' encrypt: false outlets: [] params: {} priority_weight: 1 replace: true retries: 0 retry_delay: 300.0 retry_exponential_backoff: false s3_bucket: md-workflows-mwaa-bucket s3_key: test-files/3.txt task_id: create_object_3 trigger_rule: all_success wait_for_downstream: false dependencies: [create_object_2] catchup: false description: Create multiple S3 objects in a loop max_active_runs: 16 max_active_tasks: 16 max_consecutive_failed_dag_runs: 0 DAG の解析後に YAML 変換が行われるため、DAG 内のタスクを作成する for 文が最初に実行され、結果の静的な3つのタスクリストがその依存関係とともに YAML ドキュメントに書き込まれることに注意してください。 MWAA 環境の DAG を MWAA Serverless に移行する プロビジョニングされた MWAA 環境を活用して、ワークフローを開発およびテストした後、サーバーレスで効率的にスケールして実行できます。さらに、MWAA 環境が互換性のある MWAA Serverless オペレーターを使用している場合は、その環境のすべての DAG を一度に変換できます。最初のステップは、MWAA Serverless が MWAA Execution ロールを信頼関係を介して引き受けられるようにすることです。これは MWAA Execution ロールごとに 1 回限りの操作で、IAM コンソールで手動で実行するか、または次の AWS CLI コマンドを使用して実行できます。 MWAA_ENVIRONMENT_NAME="MyAirflowEnvironment" MWAA_REGION=us-east-2 MWAA_EXECUTION_ROLE_ARN=$(aws mwaa get-environment --region $MWAA_REGION --name $MWAA_ENVIRONMENT_NAME --query 'Environment.ExecutionRoleArn' --output text ) MWAA_EXECUTION_ROLE_NAME=$(echo $MWAA_EXECUTION_ROLE_ARN | xargs basename) MWAA_EXECUTION_ROLE_POLICY=$(aws iam get-role --role-name $MWAA_EXECUTION_ROLE_NAME --query 'Role.AssumeRolePolicyDocument' --output json | jq '.Statement[0].Principal.Service += ["airflow-serverless.amazonaws.com"] | .Statement[0].Principal.Service |= unique | .Statement += [{"Sid": "AllowAirflowServerlessAssumeRole", "Effect": "Allow", "Principal": {"Service": "airflow-serverless.amazonaws.com"}, "Action": "sts:AssumeRole", "Condition": {"StringEquals": {"aws:SourceAccount": "${aws:PrincipalAccount}"}, "ArnLike": {"aws:SourceArn": "arn:aws:*:*:${aws:PrincipalAccount}:workflow/*"}}}]') aws iam update-assume-role-policy --role-name $MWAA_EXECUTION_ROLE_NAME --policy-document "$MWAA_EXECUTION_ROLE_POLICY" 正常に変換された各 DAG を順にループし、それぞれにサーバーレスワークフローを作成できます。 S3_BUCKET=$(aws mwaa get-environment --name $MWAA_ENVIRONMENT_NAME --query 'Environment.SourceBucketArn' --output text --region us-east-2 | cut -d':' -f6) for file in /tmp/yaml/*.yaml ; do MWAA_WORKFLOW_NAME=$(basename "$file" .yaml); \ aws s3 cp "$file" s3://$S3_BUCKET/yaml/$MWAA_WORKFLOW_NAME.yaml --region us-east-2 ; \ aws mwaa-serverless create-workflow --name $MWAA_WORKFLOW_NAME \ --definition-s3-location "{\"Bucket\": \"$S3_BUCKET\", \"ObjectKey\": \"yaml/$MWAA_WORKFLOW_NAME.yaml\"}" --role-arn $MWAA_EXECUTION_ROLE_ARN \ --region us-east-2 done 作成したワークフローのリストを表示するには、次のコマンドを実行します。 aws mwaa-serverless list-workflows --region us-east-2 モニタリングと可視化 MWAA サーバーレスのワークフロー実行ステータスは、 GetWorkflowRun API で返されます。結果には、その特定の実行の詳細が含まれます。ワークフロー定義にエラーがある場合、次の例のように RunDetail の ErrorMessage フィールドに返されます。 { "WorkflowVersion": "7bcd36ce4d42f5cf23bfee67a0f816c6", "RunId": "d58cxqdClpTVjeN", "RunType": "SCHEDULE", "RunDetail": { "ModifiedAt": "2025-11-03T08:02:47.625851 + 00:00", "ErrorMessage": "expected token ',', got 'create_test_table'", "TaskInstances": [], "RunState": "FAILED" } } 適切に定義されているが、タスクが失敗したワークフローは、 "ErrorMessage": "Workflow execution failed" を返します: { "WorkflowVersion": "0ad517eb5e33deca45a2514c0569079d", "RunId": "ABC123456789def", "RunType": "SCHEDULE", "RunDetail": { "StartedOn": "2025-11-03T13:12:09.904466 + 00:00", "CompletedOn": "2025-11-03T13:13:57.620605 + 00:00", "ModifiedAt": "2025-11-03T13:16:08.888182 + 00:00", "Duration": 107, "ErrorMessage": "Workflow execution failed", "TaskInstances": [ "ex_5496697b-900d-4008-8d6f-5e43767d6e36_create_bucket_1" ], "RunState": "FAILED" }, } MWAA Serverless タスクログは、CloudWatch ロググループ /aws/mwaa-serverless/<workflow id>/ に保存されます (ここで /<workflow id> は、ワークフローの ARN の一意のワークフロー ID と同じ文字列です)。特定のタスクのログストリームを取得するには、実行されたワークフローのタスクを一覧表示し、各タスクの情報を取得する必要があります。これらの操作を 1 つの CLI コマンドに組み合わせることができます。 aws mwaa-serverless list-task-instances \ --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ --run-id ABC123456789def \ --region us-east-2 \ --query 'TaskInstances[].TaskInstanceId' \ --output text | xargs -n 1 -I {} aws mwaa-serverless get-task-instance \ --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ --run-id ABC123456789def \ --task-instance-id {} \ --region us-east-2 \ --query '{Status: Status, StartedAt: StartedAt, LogStream: LogStream}' 実行結果は、次のようになります: { "Status": "SUCCESS", "StartedAt": "2025-10-28T21:21:31.753447+00:00", "LogStream": "workflow_id=simple_s3_test-abc1234def/run_id=ABC123456789def/task_id=list_objects/attempt=1.log" } { "Status": "FAILED", "StartedAt": "2025-10-28T21:23:13.446256+00:00", "LogStream": "workflow_id=simple_s3_test-abc1234def/run_id=ABC123456789def/task_id=create_object_list/attempt=1.log" } その時点で、CloudWatch の LogStream 出力を使用してワークフローをデバッグします。 Amazon MWAA Serverless コンソール で、ワークフローを表示および管理できます: AWS Lambda、Amazon CloudWatch、 Amazon DynamoDB 、 Amazon EventBridge を使用して詳細なメトリクスと監視ダッシュボードを作成する例については、 この GitHub リポジトリ の例を参照してください。 リソースのクリーンアップ このチュートリアルで作成したすべてのリソースを削除して、継続的な課金を避けるには、次の手順に従ってください。 MWAA Serverless ワークフローを削除する – すべてのワークフローを削除するには、次の AWS CLI コマンドを実行します: aws mwaa-serverless list-workflows --query 'Workflows[*].WorkflowArn' --output text | while read -r workflow ; do aws mwaa-serverless delete-workflow --workflow-arn $workflow done このチュートリアルで作成した IAM ロールとポリシーを削除します: aws iam delete-role-policy --role-name mwaa-serverless-access-role --policy-name mwaa-serverless-policy S3 バケットから YAML ワークフロー定義を削除します: aws s3 rm s3://amzn-s3-demo-bucket/yaml/ --recursive これらのステップを完了したら、AWS マネジメントコンソールで、すべてのリソースが適切に削除されたことを確認してください。CloudWatch Logs はデフォルトで保持されるため、ワークフロー実行の痕跡をすべて削除したい場合は、別途削除する必要があります。 エラーが発生した場合は、必要な権限を持っているか、リソースが存在するかを確認してから削除を試みてください。一部のリソースには、特定の順序で削除する必要がある依存関係がある可能性があります。 結論 この記事では、Apache Airflow ワークフロー管理を簡素化する新しいデプロイメントオプションである Amazon MWAA Serverless について説明しました。YAML 定義を使用してワークフローを作成する方法、既存の Python DAG をサーバーレス形式に変換する方法、ワークフローを監視する方法を実演しました。 MWAA Serverless には以下のような主要な利点があります: プロビジョニングのオーバーヘッドがありません 従量課金モデルです ワークフローの需要に基づいて自動的にスケーリングします 細かい IAM 権限によってセキュリティが強化されています YAML を使用してワークフロー定義が簡素化されています MWAA Serverless の詳細は、 ドキュメント を参照してください。 著者について John は開発者、システムアーキテクト、プロダクトマネージャーとして、スタートアップと大企業の両方で25年以上のソフトウェア経験を持ち、Amazon MWAA を担当する AWS プリンシパルプロダクトマネージャーです。