NRIネットコム Blog

NRIネットコム社員が様々な視点で、日々の気づきやナレッジを発信するメディアです

AWS Glueを使ったETL処理をTerraformで構築してみた

はじめに

はじめまして、入社1年目の寺嶋です。現在私はAWSを使ったデータ分析基盤の構築、保守、運用を行っています。 本ブログでは、データ分析基盤構築に日々携わっている中で理解したAWS Glueの基礎と簡単な構築例を紹介します。最後にTerraformを使って構築したリソースも載せるので、少しでも参考になったら嬉しいです!

AWS Glueとは

AWS Glue は、分析を行うユーザーが複数のソースからのデータを簡単に検出、準備、移動、統合できるようにするサーバーレスのデータ統合サービスです。100 を超える多様なデータソースを検出して接続し、一元化されたデータカタログでデータを管理し、ETL パイプラインを視覚的に作成、実行、モニタリングして、データをデータレイクにロードできます。

一般的なワークフロー(※1)に含まれる内容は次のとおりです。

  1. データカタログ(※2)でデータソースおよびターゲットを定義します。
  2. クローラー(※3)を使用して、データソースのテーブルメタデータでデータカタログに入力します。
  3. 変換スクリプトを使用して ETL ジョブ(※4)を定義し、データを移動および処理します。
  4. ジョブをオンデマンドまたはトリガーに基づいて実行します。
  5. ダッシュボードを使用してジョブのパフォーマンスをモニタリングします。

https://d1.awsstatic.com/webinars/jp/pdf/services/20210330_AWSBlackBelt2021_AWSGlueStudio.pdf

※1 ワークフロー: 複数のクローラ、ジョブ、およびトリガーを伴う複雑な ETL (抽出、変換、ロード) アクティビティを作成して可視化します。

Workflow

※2 データカタログ : AWSのクラウド内でどこにどんなデータがあるのか、どんな情報が入っているのかを記録して管理する場所です。 簡単に言うと「データの住所録」みたいなものです。

※3 クローラー : データの中身を調べて、それがどんなデータなのかを理解し、データカタログにその情報を自動で追加してくれます。

※4 ETLジョブ : ソースからデータの抽出、Apache Spark スクリプトを使用して変換、ターゲットにロードするビジネスロジックです。ここにデータ処理するスクリプトを記述します。

構築例

今回はAWS Glueを使った簡単なデータ分析基盤を構築したいと思います。 データは、ID、氏名、年齢、性別、メールアドレスのカラムをもちます。 データ加工(ETL処理)では、氏名を削除し、メールアドレスをハッシュ化します。

構成図

生データをAmazon S3において、加工して、加工後データをS3に保存して、Amazon Athenaを使ってAmazon S3からデータを確認できるように構成します。

加工前データ

作成リソース

Terraformを使って構築してみました。

Amazon S3

データを保存するバケットを作成します。

  1. #変数定義
  2. variable "name" {
  3. }
  4. #バケット作成
  5. resource "aws_s3_bucket" "s3_glue_script" {
  6. bucket = "${var.name}-test-bucket-1204"
  7. }
  8. # 生データ格納
  9. resource "aws_s3_bucket" "s3_raw" {
  10. bucket = "${var.name}-raw-bucket-1204"
  11. }
  12. # 整形済みデータ格納
  13. resource "aws_s3_bucket" "s3_formatted" {
  14. bucket = "${var.name}-formatted-bucket-1204"
  15. }

AWS Glue

データカタログのデータベース、クローラー、ジョブ、ワークフローを作成します。 IAMロールは別で作成し、そのロール名に変更してください。

  1. #変数定義
  2. variable "name" {
  3. }
  4. variable "worker_type" {
  5. }
  6. #データベース作成
  7. resource "aws_glue_catalog_database" "glue_db" {
  8. name = "${var.name}_db"
  9. }
  10. #生データのクローラー作成
  11. resource "aws_glue_crawler" "raw_crawler" {
  12. database_name = aws_glue_catalog_database.glue_db.name
  13. name = "${var.name}-raw-crawler"
  14. role = aws_iam_role.glue_role.name
  15. s3_target {
  16. path = "s3://${aws_s3_bucket.s3_raw.bucket}/m_terashima_raw_table"
  17. }
  18. }
  19. #整形済データのクローラー作成
  20. resource "aws_glue_crawler" "formatted_crawler" {
  21. database_name = aws_glue_catalog_database.glue_db.name
  22. name = "${var.name}-formatted-crawler"
  23. role = aws_iam_role.glue_role.name
  24. s3_target {
  25. path = "s3://${aws_s3_bucket.s3_formatted.bucket}/${var.name}_formatted_table"
  26. }
  27. }
  28. # ETLジョブ
  29. resource "aws_glue_job" "raw_job" {
  30. name = "${var.name}-raw-job"
  31. role_arn = aws_iam_role.glue_role.arn
  32. worker_type = var.worker_type
  33. number_of_workers = 2
  34. command {
  35. script_location = "s3://${aws_s3_bucket.s3_glue_script.bucket}/raw.py"
  36. python_version = "3"
  37. }
  38. default_arguments = {
  39. "--job-language" = "python"
  40. }
  41. }
  42. #ワークフロー
  43. resource "aws_glue_workflow" "workflow" {
  44. name = "${var.name}-workflow"
  45. }
  46. resource "aws_glue_trigger" "trigger-start" {
  47. name = "trigger-start"
  48. type = "ON_DEMAND"
  49. workflow_name = aws_glue_workflow.workflow.name
  50. actions {
  51. crawler_name = aws_glue_crawler.raw_crawler.name
  52. }
  53. }
  54. resource "aws_glue_trigger" "fin-raw-crawler" {
  55. name = "fin-raw-crawler"
  56. type = "CONDITIONAL"
  57. workflow_name = aws_glue_workflow.workflow.name
  58. predicate {
  59. conditions {
  60. crawler_name = aws_glue_crawler.raw_crawler.name
  61. crawl_state = "SUCCEEDED"
  62. }
  63. }
  64. actions {
  65. job_name = aws_glue_job.raw_job.name
  66. }
  67. }
  68. resource "aws_glue_trigger" "fin-raw-job" {
  69. name = "fin-raw-job"
  70. type = "CONDITIONAL"
  71. workflow_name = aws_glue_workflow.workflow.name
  72. predicate {
  73. conditions {
  74. job_name = aws_glue_job.raw_job.name
  75. state = "SUCCEEDED"
  76. }
  77. }
  78. actions {
  79. crawler_name = aws_glue_crawler.formatted_crawler.name
  80. }
  81. }

加工スクリプト(Python)

  1. import boto3
  2. import json
  3. import sys
  4. import hashlib
  5. from pyspark.context import SparkContext
  6. from awsglue.context import GlueContext
  7. from awsglue.utils import getResolvedOptions
  8. from awsglue.job import Job
  9. from pyspark.sql import functions as F
  10. from pyspark.sql.types import StringType, DateType
  11. from pyspark.sql.functions import lit
  12. from pyspark.sql.functions import to_utc_timestamp
  13. from awsglue.dynamicframe import DynamicFrame
  14. ## @params: [JOB_NAME]
  15. args = getResolvedOptions(sys.argv, ['JOB_NAME'])
  16. sc = SparkContext()
  17. glueContext = GlueContext(sc)
  18. spark = glueContext.spark_session
  19. job = Job(glueContext)
  20. job.init(args['JOB_NAME'], args)
  21. # +---- 格納先のS3バケット名を定義 ----+
  22. target_s3 = {加工後の格納先S3バケット名}
  23. # +---- データカタログからGAデータテーブルを取得 ----+
  24. target_glue_database ={データベース名}
  25. target_glue_table = {加工前テーブル名}
  26. ga_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
  27. database = target_glue_database,
  28. table_name = target_glue_table,
  29. transformation_ctx = "create_dynamic_frame"
  30. )
  31. # +---- df変換 ----+
  32. ga_df = ga_dynamic_frame.toDF()
  33. # +---- 氏名列を削除 ----+
  34. ga_df = ga_df.drop('氏名')
  35. # +---- ハッシュ化 ----+
  36. def mask_df(df, masked_columns):
  37. hash_udf = F.udf(lambda value: hashlib.sha256(value.encode()).hexdigest() if value is not None else None, StringType())
  38. for field in masked_columns:
  39. df = df.withColumn(field, hash_udf(df[field].cast(StringType())))
  40. return df
  41. masked_df = mask_df(ga_df, ["メールアドレス"])
  42. # +---- DynamicFrame変換 ----+
  43. ga_output_dynamic_fram = DynamicFrame.fromDF(masked_df, glueContext, "ga_output_dynamic_fram")
  44. # +---- S3へ格納 ----+
  45. datasink = glueContext.write_dynamic_frame.from_options(
  46. frame = ga_output_dynamic_fram,
  47. connection_type = "s3",
  48. connection_options = {
  49. "path": {加工後の格納先S3バケットのパス},
  50. },
  51. format = "parquet",
  52. transformation_ctx = "save_s3"
  53. )
  54. job.commit()

結果

リソースを展開し、作成されたAWS Glueのワークフローから「Run workflow」を押してスタートします。

全部のノードが正常終了することが確認できました。

加工後データをAmazon Athenaで見ると、想定通りの結果になっていることが確認できます。

おわりに

今回はGlueを使ってETL処理を行いました。AWSにおけるETL処理は、Glueを使えば手軽に行うことができるなと思いました。 もしETL処理が必要な場合は、ぜひ試してみてください!