Amazon Web Services ブログ

AWS DMS、Amazon Kinesis、AWS Glue ストリーミング ETL ジョブを用いて Apache Hudi ベースのニアリアルタイムトランザクションデータレイクを構築し、Amazon QuickSight で可視化

この記事は Create an Apache Hudi-based near-real-time transactional data lake using AWS DMS, Amazon Kinesis, AWS Glue streaming ETL, and data visualization using Amazon QuickSight の翻訳記事です。

テクノロジーの急速な発展に伴い、ますます多くのデータ量が、構造化、半構造化、非構造化など、さまざまな形式で提供されるようになっています。ほぼリアルタイムで業務データをデータ分析することは、一般的なニーズとなりつつあり、データ量の急激な増加により、スケーラビリティとパフォーマンスを向上させるために、リードレプリカをデータレイクに置き換えることが一般的になっています。実際のユースケースの多くでは、リレーショナルデータベースのソースからターゲットにリアルタイムでデータをレプリケートすることが重要です。変更データキャプチャ(CDC)は、ソースデータベースで行われた変更をキャプチャし、他のデータストアに反映するための最も一般的なデザインパターンの1つです。

最近、AWS Glue 4.0 でのストリーミング抽出、変換、およびロード(ETL)ジョブのサポートを発表しました。AWS Glueの新バージョンで、AWS内でのデータ統合ワークロードを高速化します。AWS Glue のストリーミング ETL ジョブは、ストリーミングソースから継続的にデータを取得し、インフライトでデータをクリーンアップおよび変換し、数秒で分析に利用できるようにします。お客様のニーズをサポートするため、AWSは幅広いサービスを提供しています。AWS Database Migration Service(AWS DMS)のようなデータベースレプリケーションサービスは、ソースシステムから Amazon Simple Storage Service(Amazon S3)というデータレイクのストレージ層をとして広く使われているストレージサービスにデータをレプリケートすることができます。リレーショナルデータベース管理システム(RDBMS)に更新を適用するのは簡単ですが、データレイクにこのCDCプロセスを適用するのは困難です。オープンソースのデータ管理フレームワークである Apache Hudi は、インクリメンタルなデータ処理とデータパイプライン開発を簡素化するために使用され、上記の問題を解決するための良い選択肢です。

この投稿では、Amazon Relational Database Service(Amazon RDS)やその他のリレーショナルデータベースから S3 データレイクにニアリアルタイムだけではなく、データの非正規化、変換、エンリッチ可能な柔軟性を持つ CDC の適応方法を紹介します。

ソリューション概要

ソース RDS インスタンスの変更をニアリアルタイムにキャプチャするために AWS DMS を使用し、CDC レプリケーションの宛先として Amazon Kinesis Data Streams を使用します。AWS Glue ストリーミングジョブは Kinesis Data Streams から変更されたレコードを読み込んでエンリッチし、Apache Hudi フォーマットで S3 データレイクにアップサートします。これにより、Amazon Athena でデータをクエリし、Amazon QuickSight で可視化することは可能になります。AWS Glue は、Apache Hudi ベースのテーブルにストリーミングデータの継続的な書き込み操作をネイティブにサポートしています。

以下の図は、この投稿で使用されたアーキテクチャを示しています。AWS CloudFormation テンプレートも提供します。

前提条件

始める前に、以下の前提条件があることを確認してください:

  • AWSアカウント
  • Amazon S3の基本的な理解
  • ダッシュボードを作成するためのQuickSightの基本的な理解
  • Amazon RDSデータベース、AWS DMSインスタンスとタスク、Kinesisデータストリーム、S3バケット、AWS Glueジョブ、AWS Glueデータカタログ、およびQuickSightダッシュボードを作成し、Athenaを使用してSQLクエリを実行するための権限を持つAWS Identity and Access Management(IAM)ロール(IAMアイデンティティ権限の追加と削除を参照してください。)

ソースデータ概要

今回は ticket_activity テーブルを使用してスポーツイベントのほぼリアルタイムのデータを分析することに興味があるデータアナリスト向けのユースケースを想定します。このテーブルの例を次のスクリーンショットに示します。

Apache Hudi connector for AWS Glue

この投稿では、Hudi フレームワークをネイティブサポートしている AWS Glue 4.0 を使用します。Hudi はオープンソースのデータレイクフレームワークで、Amazon S3 上に構築されたデータレイクにおけるインクリメンタルなデータ処理を簡素化します。タイムトラベルクエリ、ACID(原子性、整合性、分離性、永続性)トランザクション、ストリーミングデータ取り込み、CDC、アップサート、および削除などの機能が利用できます。

AWS CloudFormation を用いてリソースセットアップ

迅速なセットアップのために CloudFormation テンプレートが用意しました。ご自身のニーズに合わせて見直してカスタマイズすることができます。
CloudFormation テンプレートは以下のリソースを生成します:

  • RDS データベースインスタンス(ソース)
  • AWS DMS レプリケーションインスタンス、ソーステーブルから Kinesis データストリームにデータをレプリケートするために使用します
  • Kinesis データストリーム
  • 4つの AWS Glue Python シェルジョブ:
    • rds-ingest-rds-setup-<CloudFormation Stack name> – Amazon RDS 上に ticket_activity というソーステーブルを 1 つ作成します
    • rds-ingest-data-initial-<CloudFormation Stack name>Faker ライブラリでサンプルデータがランダムに自動生成され、ticket_activity テーブルにロードされます
    • rds-ingest-data-incremental-<CloudFormation Stack name> – 新しいチケットアクティビティデータを継続的にソーステーブル ticket_activity に取り込みます。このジョブは顧客のアクティビティをシミュレートします
    • rds-upsert-data-<CloudFormation Stack name> – ソーステーブル ticket_activity に特定のレコードをアップサートします。このジョブは管理者の活動をシミュレートします
  • AWS Identity and Access Management(IAM)のユーザーとポリシー
  • Amazon VPC、パブリックサブネット、2つのプライベートサブネット、インターネットゲートウェイ、NAT ゲートウェイ、ルートテーブル
    • プライベートサブネットは、RDS データベースインスタンスと AWS DMS レプリケーションインスタンスのため作られます
    • NAT ゲートウェイは、AWS Glue の Python シェルジョブから Python 用 MySQL コネクタを使用するために pypi.org への到達性を確保するために使用します。また、Kinesis Data Streams と Amazon S3 API エンドポイントへのアクセスも可能ようにします

これらのリソースをセットアップするには、以下の前提条件が必要です:

以下の図は、プロビジョニングされたリソースのアーキテクチャを示しています。

以下の手順で CloudFormation スタックを起動します:

  1. AWS CloudFormation コンソールにサインインします
  2. Launch Stack を選択します
  3. Next を選択します
  4. S3BucketName には、新しいS3バケットの名前を入力します
  5. VPCCIDR には、既存のネットワークと競合しない CIDR IP アドレス範囲を入力します
  6. PublicSubnetCIDR には、VPCCIDR に指定した CIDR 内の CIDR IP アドレス範囲を入力します
  7. PrivateSubnetACIDRPrivateSubnetBCIDR には、VPCCIDR で指定した CIDR 内の CIDR IP アドレス範囲を入力します

  1. SubnetAzA および SubnetAzB には、使用するサブネットを選択します
  2. DatabaseUserName には、データベースユーザー名を入力します
  3. DatabaseUserPassword には、データベース・ユーザーのパスワードを入力します
  4. Next を選択します

  1. 次のページで、Next を選択します
  2. 最後のページで詳細を確認し、I acknowledge that AWS CloudFormation may create IAM resources with custom names をチェック入れます
  3. Create stack を選択します

スタックの作成には約 20 分かかります

初期ソーステーブルの設定

AWS Glueジョブ、rds-ingest-rds-setup-<CloudFormation Stack name> は、RDS データベースインスタンス上に event というソーステーブルを作成します。Amazon RDS で初期ソーステーブルをセットアップするには、以下の手順を実行します:

  1. AWS Glue のコンソールで、ナビゲーションペインの Jobs を選択します。
  2. rds-ingest-rds-setup-<CloudFormation Stack name> を選択し、ジョブを開きます。
  3. Run をクリックします

  1. Runs タブに移動し、Run ステータスが SUCCEEDED と表示されるのを待ちます。

This job will only create the one table, ticket_activity, in the MySQL instance (DDL). See the following code:

CREATE TABLE ticket_activity (
ticketactivity_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
sport_type VARCHAR(256) NOT NULL,
start_date DATETIME NOT NULL,
location VARCHAR(256) NOT NULL,
seat_level VARCHAR(256) NOT NULL,
seat_location VARCHAR(256) NOT NULL,
ticket_price INT NOT NULL,
customer_name VARCHAR(256) NOT NULL,
email_address VARCHAR(256) NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL )

新しいレコードの取り込み

このセクションでは、新しいレコードの取り込み手順を詳しく説明する。以下の手順でジョブを実行します。

AWS DMS を使用した Kinesis Data Streams へのデータ取り込みの開始

Amazon RDS から Kinesis Data Streams へのデータ取り込みを開始するには、以下の手順を実行します:

  1. AWS DMS コンソールで、ナビゲーションペインのデータベース移行タスクを選択します。
  2. タスク rds-to-kinesis-<CloudFormation Stack name> を選択します。
  3. アクションメニューで、開始/再開を選択する。
  4. ステータスLoad complete および Replication ongoing と表示されるまで待ちます。

AWS DMS レプリケーションタスクは、Amazon RDS から Kinesis Data Streams へ継続的にデータを取り込みます。

Amazon S3 へのデータ取り込み

次に、Kinesis Data Streams から Amazon S3 へのデータ取り込みを開始するために、以下の手順を実行します:

  1. AWS Glue コンソールで、ナビゲーションペインの Jobs を選択します。
  2. streaming-cdc-kinesis2hudi-<CloudFormation Stack name> を選択してジョブを開きます。
  3. Run を選択します。

Runs タブで実行状況を確認し、Running と表示されるのを待ちます。

Amazon RDS 上のソーステーブルへのデータロード

Amazon RDS 上のソーステーブルへのデータ取り込みを開始するには、以下の手順を実行します:

  1. AWS Glue コンソールで、ナビゲーションペインの Jobs を選択します。
  2. rds-ingest-data-initial-<CloudFormation Stack name> を選択して、ジョブを開きます。
  3. Run を選択します。
  4. Runs タブに移動し、Run ステータスSUCCEEDED と表示されるのを待ちます。

取り込んだデータの検証

ジョブの開始から約 2 分後、データは Amazon S3 に取り込まれるはずです。Athena に取り込まれたデータを検証するには、以下の手順を実行します:

  1. Athena コンソールで、初めて Athena クエリを実行する場合、以下のステップを完了します:
    • 設定タブで管理を選択します。
    • Athena がクエリ結果を保存するS3 パスを指定します。
    • 保存を選択します。
  2. Editor タブで、テーブルに対して以下のクエリを実行し、データをチェックします:
SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" limit 10;

AWS CloudFormation は下記のような実行環境のアカウント ID を名前に入れ込んだデータベースを作成します
database_<your-account-number>_hudi_cdc_demo

既存のレコードを更新する

既存のレコードを更新する前に、ticket_activity テーブルからレコードの ticketactivity_id 値をメモしてください。Athena を使って以下の SQL を実行してください。この投稿では、例として ticketactivity_id = 46 を使用します:

SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" limit 10;

リアルタイムのユースケースをシミュレートするには、RDS データベースインスタンスのソーステーブル ticket_activity のデータを更新し、更新されたレコードが Amazon S3 にレプリケートされることを確認します。次の手順を実行します:

  1. AWS Glue のコンソールで、ナビゲーションペインの Jobs を選択します。
  2. rds-ingest-data-incremental-<CloudFormation Stack name> を選択し、ジョブを開きます。
  3. Run を選択します。
  4. Runs タブを選択し、Run statusSUCCEEDED と表示されるのを待ちます。

ソーステーブルのレコードをアップサートするには、以下の手順を実行します:

  1. AWS Glue のコンソールで、ナビゲーションペインの Jobs を選択します。
  2. ジョブ rds-upsert-data-<CloudFormation Stack name> を選択します。
  3. Job details タブの Advanced propertiesJob parameters で、以下のパラメータを更新します:
    • Key に –ticketactivity_id を入力します。
    • Value には 1 を上記で指定したチケット ID の 1 つ(この記事では 46)に置き換えてください。
  4. Save を選択します。
  5. Run を選択し、Run statusSUCCEEDED と表示されるのを待ちます。

この AWS Glue Python シェルジョブは、チケットを購入する顧客のアクティビティをシミュレートします。ジョブの引数. --ticketactivity_id で渡されたチケット ID を使用して、RDS データベースインスタンスのソーステーブル ticket_activity のレコードを更新します。これは ticket_price=500updated_at を現在のタイムスタンプで更新します。
Amazon s3 に取り込まれたデータを検証するために、Athena から同じクエリを実行し、先ほどの ticket_activity の値をチェックし、ticket_priceupdated_at フィールドを観察します。

SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" where ticketactivity_id = 46 ;

QuickSight でデータ可視化

AWS Glue ストリーミングジョブによって生成された出力ファイルを S3 バケットに入れたら、QuickSight を使って Hudi データファイルを可視化することができます。QuickSight は、クラウド用に構築されたスケーラブルでサーバーレス、組み込み可能な ML を利用したビジネスインテリジェンス(BI)サービスです。QuickSight を使えば、ML を活用した洞察を含むインタラクティブな BI ダッシュボードを簡単に作成、公開することができます。QuickSight ダッシュボードは、あらゆるデバイスからアクセスでき、アプリケーション、ポータル、ウェブサイトにシームレスに組み込むことができます。

QuickSight ダッシュボードの作成

QuickSight ダッシュボードを構築するには、以下の手順を実行します:

  1. QuickSight コンソールを開きます。

QuickSight のウェルカムページが表示されます。QuickSight にサインアップしていない場合は、サインアップ・ウィザードを完了する必要があります。詳細については、Amazon QuickSight サブスクリプションのサインアップを参照してください。

サインアップ後、QuickSight は “ようこそウィザード “を表示します。短いチュートリアルを表示することも、閉じることもできます。

  1. QuickSight コンソールでユーザー名を選択し、QuickSight を管理を選択します。
  2. セキュリティとアクセス権限を選択し、管理を選択します。
  3. Amazon S3 を選択し、先ほど AWS CloudFormation で作成したバケットを選択します。
  4. Amazon Athena を選択します。
  5. Save を選択します。
  6. QuickSight はリージョンを意識せずご利用いただけますが、後続の操作を行う前に、AWS Glue 等で使用したリージョンに戻してください。

データセットの作成

QuickSight を起動して実行できるようになったので、データセットを作成できます。以下の手順を実行します:

  1. QuickSight コンソールのナビゲーションでデータセットを選択します。
  2. 新規データセットを選択します。
  3. Athena を選択します。
  4. データソース名 には名前を入力します(例:hudi-blog)。
  5. Validate を選択肢します。
  6. 検証が成功したら、Create data source を選択します。
  7. データベースは、database_<your-account-number>_hudi_cdc_demo を選択します。
  8. テーブル ticket_activity を選択します。
  9. Select を選択します。
  10. Visualize を選択します
  11. 時間、ticket_activity_id の順に選択すると、時間ごとの ticket_activity_id のカウントを取得できます。

後片付け

料金が発生しないようにこのソリューションを環境内でテスト目的で使用していた場合は、以下の手順で後片付けます:

  1. AWS DMS レプリケーションタスク rds-to-kinesis-<CloudFormation Stack name> を停止します。
  2. RDS データベースに移動し、変更を選択します。
  3. 削除保護を有効にするの選択を解除し、続行を選択します。
  4. AWS Glue のストリーミングジョブ streaming-cdc-kinesis2redshift-<CloudFormation Stack name> を停止します。
  5. CloudFormation スタックを削除します。
  6. QuickSight ダッシュボードで、ユーザー名を選択し、QuickSightを管理を選択します。
  7. アカウント設定を選択し、アカウントの終了を選択します。
  8. アカウントの終了を選択して確認します。
  9. 確認を入力し、アカウントを削除を選択します。

まとめ

この投稿では、Apache Hudi ベースのほぼリアルタイムのトランザクションデータレイクを作成するために、AWS Glue ストリーミングジョブを使用して、新しいレコードだけでなく、リレーショナルデータベースから更新されたレコードも Amazon S3 にストリーミングする方法を示しました。このアプローチによって、Amazon S3 でアップサートのユースケースを簡単に実現できます。また、QuickSight と Athena を使って Apache Hudi のテーブルを可視化する方法も紹介した。次のステップとして、大容量データセット用の Apache Hudi パフォーマンスチューニングガイドを参照してください。QuickSight でのダッシュボードのオーサリングの詳細については、QuickSightダッシュボード作成体験ワークショップ(日本語)QuickSight オーサーワークショップ(英語)をご覧ください。

翻訳はソリューションアーキテクトの Rui Lee が担当しました。原文はこちらです。