はじめに
はじめまして、入社1年目の寺嶋です。現在私はAWSを使ったデータ分析基盤の構築、保守、運用を行っています。 本ブログでは、データ分析基盤構築に日々携わっている中で理解したAWS Glueの基礎と簡単な構築例を紹介します。最後にTerraformを使って構築したリソースも載せるので、少しでも参考になったら嬉しいです!
AWS Glueとは
AWS Glue は、分析を行うユーザーが複数のソースからのデータを簡単に検出、準備、移動、統合できるようにするサーバーレスのデータ統合サービスです。100 を超える多様なデータソースを検出して接続し、一元化されたデータカタログでデータを管理し、ETL パイプラインを視覚的に作成、実行、モニタリングして、データをデータレイクにロードできます。
一般的なワークフロー(※1)に含まれる内容は次のとおりです。
- データカタログ(※2)でデータソースおよびターゲットを定義します。
- クローラー(※3)を使用して、データソースのテーブルメタデータでデータカタログに入力します。
- 変換スクリプトを使用して ETL ジョブ(※4)を定義し、データを移動および処理します。
- ジョブをオンデマンドまたはトリガーに基づいて実行します。
- ダッシュボードを使用してジョブのパフォーマンスをモニタリングします。
https://d1.awsstatic.com/webinars/jp/pdf/services/20210330_AWSBlackBelt2021_AWSGlueStudio.pdf
※1 ワークフロー: 複数のクローラ、ジョブ、およびトリガーを伴う複雑な ETL (抽出、変換、ロード) アクティビティを作成して可視化します。

※2 データカタログ : AWSのクラウド内でどこにどんなデータがあるのか、どんな情報が入っているのかを記録して管理する場所です。 簡単に言うと「データの住所録」みたいなものです。
※3 クローラー : データの中身を調べて、それがどんなデータなのかを理解し、データカタログにその情報を自動で追加してくれます。
※4 ETLジョブ : ソースからデータの抽出、Apache Spark スクリプトを使用して変換、ターゲットにロードするビジネスロジックです。ここにデータ処理するスクリプトを記述します。
構築例
今回はAWS Glueを使った簡単なデータ分析基盤を構築したいと思います。 データは、ID、氏名、年齢、性別、メールアドレスのカラムをもちます。 データ加工(ETL処理)では、氏名を削除し、メールアドレスをハッシュ化します。
構成図
生データをAmazon S3において、加工して、加工後データをS3に保存して、Amazon Athenaを使ってAmazon S3からデータを確認できるように構成します。
加工前データ
作成リソース
Terraformを使って構築してみました。
Amazon S3
データを保存するバケットを作成します。
- #変数定義
- variable "name" {
- }
- #バケット作成
- resource "aws_s3_bucket" "s3_glue_script" {
- bucket = "${var.name}-test-bucket-1204"
- }
- # 生データ格納
- resource "aws_s3_bucket" "s3_raw" {
- bucket = "${var.name}-raw-bucket-1204"
- }
- # 整形済みデータ格納
- resource "aws_s3_bucket" "s3_formatted" {
- bucket = "${var.name}-formatted-bucket-1204"
- }
AWS Glue
データカタログのデータベース、クローラー、ジョブ、ワークフローを作成します。 IAMロールは別で作成し、そのロール名に変更してください。
- #変数定義
- variable "name" {
- }
- variable "worker_type" {
- }
- #データベース作成
- resource "aws_glue_catalog_database" "glue_db" {
- name = "${var.name}_db"
- }
- #生データのクローラー作成
- resource "aws_glue_crawler" "raw_crawler" {
- database_name = aws_glue_catalog_database.glue_db.name
- name = "${var.name}-raw-crawler"
- role = aws_iam_role.glue_role.name
- s3_target {
- path = "s3://${aws_s3_bucket.s3_raw.bucket}/m_terashima_raw_table"
- }
- }
- #整形済データのクローラー作成
- resource "aws_glue_crawler" "formatted_crawler" {
- database_name = aws_glue_catalog_database.glue_db.name
- name = "${var.name}-formatted-crawler"
- role = aws_iam_role.glue_role.name
- s3_target {
- path = "s3://${aws_s3_bucket.s3_formatted.bucket}/${var.name}_formatted_table"
- }
- }
- # ETLジョブ
- resource "aws_glue_job" "raw_job" {
- name = "${var.name}-raw-job"
- role_arn = aws_iam_role.glue_role.arn
- worker_type = var.worker_type
- number_of_workers = 2
- command {
- script_location = "s3://${aws_s3_bucket.s3_glue_script.bucket}/raw.py"
- python_version = "3"
- }
- default_arguments = {
- "--job-language" = "python"
- }
- }
- #ワークフロー
- resource "aws_glue_workflow" "workflow" {
- name = "${var.name}-workflow"
- }
- resource "aws_glue_trigger" "trigger-start" {
- name = "trigger-start"
- type = "ON_DEMAND"
- workflow_name = aws_glue_workflow.workflow.name
- actions {
- crawler_name = aws_glue_crawler.raw_crawler.name
- }
- }
- resource "aws_glue_trigger" "fin-raw-crawler" {
- name = "fin-raw-crawler"
- type = "CONDITIONAL"
- workflow_name = aws_glue_workflow.workflow.name
- predicate {
- conditions {
- crawler_name = aws_glue_crawler.raw_crawler.name
- crawl_state = "SUCCEEDED"
- }
- }
- actions {
- job_name = aws_glue_job.raw_job.name
- }
- }
- resource "aws_glue_trigger" "fin-raw-job" {
- name = "fin-raw-job"
- type = "CONDITIONAL"
- workflow_name = aws_glue_workflow.workflow.name
- predicate {
- conditions {
- job_name = aws_glue_job.raw_job.name
- state = "SUCCEEDED"
- }
- }
- actions {
- crawler_name = aws_glue_crawler.formatted_crawler.name
- }
- }
加工スクリプト(Python)
- import boto3
- import json
- import sys
- import hashlib
- from pyspark.context import SparkContext
- from awsglue.context import GlueContext
- from awsglue.utils import getResolvedOptions
- from awsglue.job import Job
- from pyspark.sql import functions as F
- from pyspark.sql.types import StringType, DateType
- from pyspark.sql.functions import lit
- from pyspark.sql.functions import to_utc_timestamp
- from awsglue.dynamicframe import DynamicFrame
- ## @params: [JOB_NAME]
- args = getResolvedOptions(sys.argv, ['JOB_NAME'])
- sc = SparkContext()
- glueContext = GlueContext(sc)
- spark = glueContext.spark_session
- job = Job(glueContext)
- job.init(args['JOB_NAME'], args)
- # +---- 格納先のS3バケット名を定義 ----+
- target_s3 = {加工後の格納先S3バケット名}
- # +---- データカタログからGAデータテーブルを取得 ----+
- target_glue_database ={データベース名}
- target_glue_table = {加工前テーブル名}
- ga_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
- database = target_glue_database,
- table_name = target_glue_table,
- transformation_ctx = "create_dynamic_frame"
- )
- # +---- df変換 ----+
- ga_df = ga_dynamic_frame.toDF()
- # +---- 氏名列を削除 ----+
- ga_df = ga_df.drop('氏名')
- # +---- ハッシュ化 ----+
- def mask_df(df, masked_columns):
- hash_udf = F.udf(lambda value: hashlib.sha256(value.encode()).hexdigest() if value is not None else None, StringType())
- for field in masked_columns:
- df = df.withColumn(field, hash_udf(df[field].cast(StringType())))
- return df
- masked_df = mask_df(ga_df, ["メールアドレス"])
- # +---- DynamicFrame変換 ----+
- ga_output_dynamic_fram = DynamicFrame.fromDF(masked_df, glueContext, "ga_output_dynamic_fram")
- # +---- S3へ格納 ----+
- datasink = glueContext.write_dynamic_frame.from_options(
- frame = ga_output_dynamic_fram,
- connection_type = "s3",
- connection_options = {
- "path": {加工後の格納先S3バケットのパス},
- },
- format = "parquet",
- transformation_ctx = "save_s3"
- )
- job.commit()
結果
リソースを展開し、作成されたAWS Glueのワークフローから「Run workflow」を押してスタートします。
全部のノードが正常終了することが確認できました。
加工後データをAmazon Athenaで見ると、想定通りの結果になっていることが確認できます。
おわりに
今回はGlueを使ってETL処理を行いました。AWSにおけるETL処理は、Glueを使えば手軽に行うことができるなと思いました。 もしETL処理が必要な場合は、ぜひ試してみてください!