
Apache Spark
イベント
該当するコンテンツが見つかりませんでした
マガジン
技術ブログ
本記事は アマゾン ウェブ サービス ジャパン合同会社 ソリューションアーキテクト 疋田、畠 と、Fivetran による共著です。 はじめに 本記事では、 Fivetran の Managed Data Lake Service 及び CDC 機能を活用して業務システムの RDBMS から Amazon S3 上の Apache Iceberg テーブルへリアルタイムにデータ連携が必要となるユースケースや構成イメージ、実装例を記載します。 本記事では、業務システムの RDBMS からリアルタイムにデータを連携するユースケースを紹介します。また、 Fivetran の Fivetran Managed Data Lake Service 及び CDC 機能を用いて Amazon S3 上の Apache Iceberg テーブルを活用する構成と実装例をご紹介します。 お客様の業務システムには、受注・在庫・会計といった大量のトランザクションデータが蓄積されています。これらのデータを分析やビジネス上の意思決定に活用したいというニーズは年々高まっており、近年では生成 AI の学習データ基盤としてもオープンテーブルフォーマットを活用したデータ基盤へのデータ集約が注目されています。 しかし、業務システムの RDBMS 上で直接分析クエリを実行すると、本来のトランザクション処理に影響を及ぼすリスクがあります。受発注や在庫更新のような日常的なデータの読み書きトランザクション処理は OLTP(Online Transaction Processing) 、大量データの集計や傾向分析などの分析処理は OLAP(Online Analytical Processing) と呼ばれ、それぞれ求められる処理特性が異なります。高スペックな RDBMS を用いて OLTP および OLAP を単一のデータベース基盤で処理する方式の場合、最新のデータを扱えますが、ハードウェアやライセンス等のコストが増大します。夜間バッチで OLAP 基盤へデータを連携する方法では、コストを抑えられますが、データの鮮度がバッチの実行間隔に依存するといったトレードオフが生じます。 こうした課題を解決する手段の一つが、 Change Data Capture(CDC) を用いた業務システムとデータ基盤の連携です。CDC はデータベースの変更をリアルタイムに検知・取得する技術で、業務システムへの負荷を最小限に抑えながらデータを連携できます。 本ブログでは、AWS Data and Analytics コンピテンシーパートナーである Fivetran の Managed Data Lake Service (MDLS) 機能を活用し、業務システムの RDBMS から Amazon S3 上の Apache Iceberg テーブルへデータを連携する方法をご紹介します。Apache Iceberg は大規模な分析データセットを管理するためのオープンテーブルフォーマットで、ACID トランザクションやスキーマ進化、タイムトラベルといった機能を提供します。連携されたデータは AWS Glue データカタログに登録され、 Amazon Redshift 、 Amazon Athena 等からすぐにクエリ・分析が可能です。 業務システムから Iceberg テーブルへの CDC データ連携のユースケース 業務システムで用いられるデータベース基盤から Apache Iceberg テーブルへ CDC でデータを連携するユースケースとして、以下のようなものが挙げられます。 OLTP と OLAP の分離によるデータベース基盤のダウンサイジング オンプレミスの高性能なデータベース基盤では、 OLTP と OLAP の両方を単一の基盤上で実行しているケースが少なくありません。OLAP ワークロードに対応するために基盤のスペックが引き上げられ、結果としてライセンスコストやハードウェアコストが増大しているという状況は、多くのお客様に共通する課題です。 これに対して、CDC を用いて業務データを Apache Iceberg テーブルに連携することで、OLAP ワークロードを Amazon Athena にオフロードできます。Fivetram Managed Data Lake Service が連携したデータは AWS Glue データカタログに自動的に登録されるため、Athena からすぐにクエリを開始できます。これにより、既存のデータベース基盤は本来の OLTP 処理に専念でき、スペックの適正化やダウンサイジングが可能になります。Amazon Athena はサーバーレスサービスであるため、インフラストラクチャの管理が不要で、クエリのデータスキャン量に応じた従量課金で利用できます。 バッチ処理のオフロード 多くの企業では、業務システムのデータを分析用データベースに連携するために、夜間バッチによる定期的なデータ抽出を行っています。この方式では、データの鮮度がバッチの実行間隔に依存するため、日中に発生した変更が分析に反映されるのは翌日以降になる場合も少なくありません。また、バッチ処理自体がデータベースに大きな負荷をかけるため、業務時間外に実行せざるを得ないという制約もあります。 CDC を活用すれば、データベースのトランザクションログから変更データをリアルタイムに取得可能です。これにより、業務システムへの負荷を最小限に抑えながら、データの鮮度を大幅に向上でき、夜間バッチの廃止や実行頻度の削減等を狙えます。また、Fivetran のようなサーバーレスなマネージドサービスを用いることで運用負荷の軽減や、後述の Fivetran Managed Data Lake Service のような多くの機能を素早く利用可能です。 履歴データの保持と分析 業務システムでは、データは常に最新の状態に更新されるため、過去のある時点の状態を参照することが困難です。たとえば、「ある顧客の住所が変更される前の情報」や「商品の価格改定前の単価」といった履歴情報は、通常の RDBMS 上では保持されません。 Fivetran の CDC は データウェアハウスにおける履歴管理手法である Slowly Changing Dimension Type 2(SCD Type 2)に対応しています。SCD Type 2 により、レコードの変更履歴を保持する形式でデータを連携できます。これにより、Apache Iceberg テーブル上に変更の履歴が蓄積され、「いつ、どのように変更されたか」を後から分析することが可能になります。Iceberg のタイムトラベル機能と組み合わせることで、任意の時点のデータスナップショットを参照することもできます。 Fivetranとは? エンタープライズグレードのCDCとデータ統合の自動化 現代のデータアーキテクチャは、Amazon S3 を核として、オープンで柔軟な基盤へと進化しています。組織がレイクハウス型の分析へと移行するにつれ、Apache Iceberg のようなオープンテーブル形式でデータを取得することが、データのポータビリティと将来への対応を確保する上で不可欠になっています。 エンタープライズグレードのデータ移動: High-Volume Agent(HVA)と Binary Log Reader データエンジニアリングにおける最大の課題は、データの取り込みだけでなく、ミッションクリティカルなワークロードの長期的な信頼性と拡張性を維持することです。Fivetran は 700 を超えるフルマネージドコネクタを提供し、多様なデータソースからの連携をノーコードで実現します。特に Oracle(Oracle Exadata を含む)のような要求の厳しい環境向けには、高度な Binary Log Reader 技術を活用した低レイテンシのログベース CDC に対応しています。Oracle Database 19c 以降に最適化されたこの仕組みは、REDO ログを直接分析することで LogMiner などの従来型ツールのオーバーヘッドを回避します。多くの場合、ソースに近い環境に配置した High-Volume Agent(HVA) を介して REDO ログを読み取ることで、テラバイト規模のデータであっても本番環境への影響を最小限に抑えながらリアルタイムに移動できます。これにより、コアビジネスシステムの安定性を損なうことなく、シームレスな OLAP オフロードが可能になります。 自動化されたガバナンスと将来を見据えたテーブル Fivetran MDLS は、スキーマの変更を自動的に検知・反映することで、スキーマドリフトに自動的に対応します。また、レコードの変更履歴が保持される SCD Type 2 のサポートにより、Iceberg テーブル管理の運用負担を軽減します。これにより、ソーススキーマが変化しても、下流の分析の一貫性と「将来への対応」が確保されます。 検出を効率化するため、Fivetran は AWS Glue データカタログとネイティブに統合されています。Amazon S3 で Icebergテーブルが更新されると共に、メタデータが自動的に同期されます。これにより、Amazon Athena 経由でデータセットを即座に検出及びクエリできるようになります。Fivetran の自動データ移動と AWS のスケーラブルなインフラストラクチャを組み合わせることで、チームはエンタープライズ規模の AI と分析に対応できる、ガバナンスが効いた高性能なデータ基盤を構築できます。 さらに、Fivetran MDLS はデータのロード・更新にとどまらず、Icebergテーブルのパフォーマンスを継続的に維持するための自動管理機能をバックグラウンドで実行します。具体的には、クエリ性能を最適化するための小さなファイルの統合(コンパクション)、不要な孤立ファイルの削除、古いスナップショットの自動期限切れ処理などが含まれます。これらの運用タスクが自動化されることで、チームはインフラ管理ではなくデータ活用に集中できます。 また、Fivetran MDLS を活用することで、各ツールやシステムがデータのコピーを個別に持つ必要がなくなります。データは Amazon S3 上の Iceberg テーブルとして一元管理されるため、ストレージコストを抑えながら「単一の真実のバージョン(Single Source of Truth)」を実現し、組織全体のデータサイロを防ぐことができます。 構成の概要とシナリオ ここでは、PostgreSQL で稼働する業務システムのデータを分析基盤に連携するシナリオを例に、具体的な手順をご紹介します。業務データベースに対して分析用途の集計クエリを直接実行すると本番ワークロードへの負荷が懸念されます。加えて、一部のテーブルについては各レコードの変更履歴を追跡できる形でデータを蓄積する必要があります。 そこで、Fivetran を業務データベースに接続し、Amazon S3 上に Iceberg 形式でデータを蓄積するパイプラインを構築します。これにより、業務システムの OLTP ワークロードと分析の OLAP ワークロードを分離しつつ、両者を継続的に連携できます。S3 に書き込まれた Iceberg テーブルのメタデータは AWS Glue Data Catalog に登録されるため、Amazon Athena から即座にクエリが可能です。 今回の構成は以下の通りです。 ソース: PostgreSQL データベース CDC 処理・データレイク管理: Fivetran MDLS ターゲット: Amazon S3 + AWS Glue Data Catalog(Apache Iceberg 形式) クエリエンジン: Amazon Athena Iceberg 形式のデータは、Amazon Athena 以外にも Amazon Redshift や Apache Spark、Trino など Iceberg をサポートする様々なクエリエンジンからデータをコピーすることなく参照できます。要件の変化に応じて最適なツールを使い分けることが可能です。 サンプルデータ 今回のウォークスルーでは、業務システムを模した以下の 2 つのテーブルを使用します。 orders テーブル(受注データ)は、日々の受注を記録するトランザクションテーブルです。Fivetran MDLS による通常の CDC で連携し、INSERT/UPDATE/DELETE がそのまま Iceberg テーブルに反映される様子を確認します。 カラム名 型 説明 id SERIAL (PK) 受注ID product_id INTEGER 商品ID quantity INTEGER 数量 total_price NUMERIC(10,2) 合計金額 status VARCHAR(20) ステータス(pending / confirmed / shipped / cancelled) ordered_at TIMESTAMP 受注日時 初期データとして、以下の 5 件の受注が登録されています。 id product_id quantity total_price status 1 1 1 128,000 confirmed 2 2 2 90,000 shipped 3 3 1 18,000 pending 4 5 3 10,500 confirmed 5 4 2 17,800 pending products テーブル(商品マスタ)は、商品の基本情報を管理するマスタテーブルです。Fivetran MDLS の SCD Type 2 を使った連携方式を使用し、価格改定などの更新が行われた際に変更前のレコードが履歴として保持される様子を確認します。 カラム名 型 説明 id SERIAL (PK) 商品ID name VARCHAR(100) 商品名 category VARCHAR(50) カテゴリ unit_price NUMERIC(10,2) 単価 is_active BOOLEAN 販売中フラグ updated_at TIMESTAMP 更新日時 初期データとして、以下の 5 商品が登録されています。 id name category unit_price 1 ノートPC 14インチ PC 128,000 2 モニター 27インチ 周辺機器 45,000 3 メカニカルキーボード 周辺機器 18,000 4 ワイヤレスマウス 周辺機器 8,900 5 USBハブ 7ポート アクセサリ 3,500 セットアップ手順 ここからは、実際に環境を構築しながら手順を説明します。全体の流れは以下の通りです。 AWS 側の準備(S3 バケット、IAM ロール、Glue データカタログ) Fivetran のデスティネーション(データレイク)設定 ソースデータベース(Aurora PostgreSQL)の準備 Fivetran のコネクター設定と CDC 連携の開始 AWS 側の準備 S3 バケットの作成 Iceberg テーブルのデータファイルとメタデータを格納する S3 バケットを作成します。詳細は Amazon S3 の開始方法 を参照してください。 IAM ロールの作成 Fivetran が S3 バケットへの書き込みと Glue Data Catalog の操作を行うための IAM ロールを作成します。必要な IAM ポリシーや信頼ポリシーの設定手順は、Fivetran の Managed Data Lake Service セットアップガイド に記載されています。 Fivetran のデスティネーション設定 Fivetran のダッシュボードから、データの書き込み先となるデスティネーションを設定します。今回は S3 Data Lake を選択し、作成した S3 バケットと IAM ロールの情報を入力します。設定の詳細は Fivetran の Managed Data Lake Service セットアップガイド を参照してください。 設定の中で、Update AWS Glue Catalog を有効にすると、Fivetran が Iceberg テーブルのメタデータを Glue Data Catalog に自動登録するようになり、Amazon Athena などからすぐにクエリできる状態になります。 設定が完了すると、以下のようにデスティネーションが登録されます。 ソースデータベースの準備 今回は Amazon Aurora PostgreSQL をソースデータベースとして使用します。論理レプリケーションの有効化やユーザーの作成など、ソース側の設定は Fivetran の Aurora PostgreSQL セットアップガイド に従って進めます。 セットアップガイドに従い、Fivetran 専用のデータベースユーザーの作成と読み取り権限・レプリケーション権限の付与、Publication とレプリケーションスロットの作成を行います。 Fivetran のコネクション設定 Fivetran では、デスティネーションに対してデータソースごとのコネクションを作成することでデータパイプラインを構成します。先ほど作成したデスティネーションに対して、 Amazon Aurora PostgreSQL へのコネクションを追加します。PostgreSQL を選択し、Aurora クラスターのエンドポイントや認証情報を入力します。 データベースへの接続方法は直接接続のほか SSH トンネルや AWS PrivateLink にも対応しています。また、増分同期の方式(Update Method)には Logical Replication と Query-Based の 2 種類があります。Logical Replication は WAL(Write-Ahead Log)から変更を検知する方式で、Aurora PostgreSQL バージョン 10 以降で利用できます。今回はこちらを選択し、先ほど作成した Replication Slot と Publication Name を指定します。 設定が完了すると、以下のようにコネクションが登録されます。 データ連携の設定と同期の開始 コネクションの設定画面では、同期対象のデータや連携方式に関する様々な設定を行うことができます。 コネクションの Schema タブでは、同期対象のテーブルやカラムを個別に選択できます。不要なテーブルを除外したり、機密性の高いカラムを連携対象から外すといった制御が可能です。 また、ソース側でテーブルやカラムが追加された場合の挙動を Schema Change Handling で制御できます。すべて自動で同期する(Allow all)、既存テーブルへのカラム追加のみ同期する(Allow columns)、すべてブロックする(Block all)の 3 つのオプションから選択できます。 「Sync Mode」により、テーブルへの変更履歴の反映方法を設定できます。ソースの変更をそのまま反映したい場合は、「Soft delete mode」(デフォルト)を選択します。SCD Type 2 でデータを連携したい場合は、「History mode」を選択します。 今回は products テーブル の Sync Mode を History mode に変更し、SCD Type 2 での履歴管理を有効にしています。これにより、レコードの変更時に変更前の状態が履歴として保持されます。 同期の頻度は Sync frequency で設定します。プランに応じて最短 1 分間隔から選択できます。 これらの設定を行った上で Start Initial Sync をクリックすると、初回のフルロードが開始されます。ソーステーブルの既存データがすべて Iceberg テーブルに転送されます。 データ連携の確認 初回同期の確認 AWS マネージメントコンソール からGlue カタログを確認してみると、テーブルが連携されていることがわかります。 連携先の Iceberg テーブルは Amazon Athena や AWS Glue、Amazon Redshift などさまざまなエンジンからクエリすることが可能です。Iceberg テーブルへのクエリに対応している 3rd party の製品からのクエリももちろん可能です。 今回は、 Amazon SageMaker Unified Studio の AI エージェントが組み込まれたノートブック から Amazon Athena でクエリを行ってみました。以下の通り、連携された Iceberg テーブルを簡単にクエリすることができました。 ソーステーブルのカラムに加えて、Fivetran が自動的に付与するメタデータカラムが確認できます。 orders テーブル(通常の CDC)では、以下のメタデータカラムが追加されています。 _fivetran_deleted : ソース側で削除されたレコードを示すフラグ。削除が検知されると true になる _fivetran_synced : Fivetran がレコードを同期した日時 products テーブル(SCD Type 2)では、上記に加えて履歴管理のためのカラムが追加されています。 _fivetran_start : そのレコードが有効になった日時 _fivetran_end : そのレコードが無効になった日時。現在有効なレコードは 9999-12-31T23:59:59.999Z _fivetran_active : 現在有効なレコードかどうかを示すフラグ 現時点ではすべてのレコードが _fivetran_active = true で、初回同期時点のスナップショットが格納されている状態です。ここからソースデータベースに変更を加え、CDC による差分連携と SCD Type 2 の履歴管理の動作を確認していきます。 CDC による差分連携の確認 初回同期の完了後、ソースデータベースに対して INSERT、UPDATE、DELETE を実行し、変更が Iceberg テーブルに反映されることを確認します。 -- 新しい受注の追加 INSERT INTO orders (product_id, quantity, total_price, status, ordered_at) VALUES (1, 2, 256000, 'pending', NOW()); -- 受注ステータスの更新 UPDATE orders SET status = 'shipped' WHERE id = 3; -- 受注のキャンセル(削除) DELETE FROM orders WHERE id = 5; Fivetran の次回同期が実行された後、Athena で再度クエリを実行すると、これらの変更が反映されていることを確認できます。 id=6 として新しいレコードが追加されている(INSERT の反映) id=3 の status が pending から shipped に変更されている(UPDATE の反映) id=5 の _fivetran_deleted が true になっている(DELETE の反映) DELETE されたレコードはテーブルから物理的に削除されるのではなく、 _fivetran_deleted = true のフラグで論理削除として管理されます。これにより、削除の履歴を保持しつつ、分析時には WHERE _fivetran_deleted = false でフィルタすることで最新の有効データのみを参照できます。 SCD Type 2 による履歴管理の確認 products テーブルで商品の価格改定と販売終了を行い、SCD Type 2 による履歴管理を確認します。 -- 商品の価格改定 UPDATE products SET unit_price = 138000, updated_at = NOW() WHERE id = 1; -- 商品の販売終了 UPDATE products SET is_active = false, updated_at = NOW() WHERE id = 5; Fivetran の同期後、Amazon Athena で products テーブルをクエリすると、変更前と変更後の両方のレコードが保持されていることを確認できます。 たとえば id=1(ノートPC 14インチ)では、価格改定前の unit_price=128,000 のレコードが _fivetran_active = false として残り、改定後の unit_price=138,000 のレコードが _fivetran_active = true として追加されます。同様に id=5(USBハブ 7ポート)では、is_active が true だった時点のレコードと false に変更された後のレコードがそれぞれ保持されます。 このように、ソース側では単純な UPDATE であっても、 _fivetran_active = false のレコードも含めて参照することで「いつ、どのような値だったか」の履歴を分析できます。また、 _fivetran_active = true のレコードのみを対象にクエリすることで、テーブルの最新の状態を参照することもできます。 以上のウォークスルーで確認した通り、Fivetran MDLS を使うことで Aurora PostgreSQL から Iceberg テーブルへの CDC パイプラインを、コードを書くことなく構築できました。通常の CDC による INSERT/UPDATE/DELETE の即時反映に加え、SCD Type 2 による変更履歴の自動蓄積、テーブル・カラム単位の同期制御やスキーマ変更への対応など、運用に必要な機能が設定画面上で完結しています。連携されたデータは Glue Data Catalog を通じて Athena からすぐにクエリでき、分析基盤としてすぐに活用を開始できる状態になります。 まとめ 本記事では、Fivetran Managed Data Lake Service 及び CDC 機能を活用し、業務システムの RDBMS から Amazon S3 上の Apache Iceberg テーブルへリアルタイムにデータを連携する構成を紹介しました。Fivetran の Binary Log Reader による低負荷な変更データ取得と、AWS Glue データカタログへの自動メタデータ同期により、OLTP/OLAP の分離やバッチ処理のオフロードといったユースケースをシンプルな構成で実現が可能となります。
こんにちは、SCSKの松岡です🔗 データ連携の実装でAWS Glue (Python Shell Job)を導入した際の試行錯誤を整理しました。 RDSからデータレイクであるS3 Tablesに連携する際に、横展開可能な軽量なデータ連携ジョブを実現するために気にしたポイントについて紹介します。 背景 データ活用基盤を構築するにあたり、「データをどのように集めるか」は重要なテーマの一つです。 仮に収集元のシステムが単一であっても、対象となるテーブルが複数存在する場合、テーブルごとに連携方法を検討し、ジョブとして実装していく必要があります。そのため、連携対象のテーブル数が多い場合には、テーブル単位での開発工数をいかに抑えつつ、効率的に横展開できるかが重要なポイントとなります。 また、データ連携方式を検討する際には、データ量だけでなく実行頻度も重要な判断軸となります。1回あたりのデータ量が少量であっても、高頻度で実行する必要がある場合、実行回数に比例してコストが増加するため、想定以上にコストが膨らむ可能性があります。 連携元はRDS(PostgreSQL)のみだが、連携対象テーブルが多数 処理1回あたりのデータ量は少量だが、毎時で何度も差分連携しなければならない 開発コストと運用コストの両方をなるべく抑えたい データレイクとしてIceberg (S3 Tables)にデータを連携したい このような前提のもと、開発コストと運用コストの双方を抑えつつ、シンプルに実現できるデータ連携方式を検討しました。 構成と選定理由 Why AWS Glue(Python Shell)? データ連携方式として、AWSサービスを利用することを前提に、以下から比較検討しました。 AWS Glue (Python Shell) AWS Glue (Spark) AWS DMS AWS Lambda その結果、今回は AWS Glue (Python Shell) を採用しました。 主な理由は以下の通りです。 AWS Glue (Python Shell)は起動時間が短く、小規模かつ高頻度なデータ連携を効率的に実行できる AWS Glue (Python Shell)は、PythonベースでSQL抽出や変換処理を柔軟に制御できる AWS Glue (Spark)は大規模データ処理に適しているが、要件に対して過剰スペックになる AWS DMSには差分レプリケーション機能があるが、S3 Tablesに対応していない(※2025調査時点) AWS Lambdaで柔軟な処理開発が可能だが、15分の実行時間制約がある 参考:AWS Glue の料金 構成 データソースからRDS(PostgreSQL)に集約したデータを、AWS Glue(Python Shell)ジョブによりAmazon S3へ連携し、未加工データとしてS3 Tablesに蓄積します。その後、AthenaやRedshiftから参照・集計し、BIツール(Power BI)で可視化する構成としています。 構成図では簡略化していますが、ジョブの起動はGlueトリガーによりスケジュール実行し、実行結果の監視や通知についてはEventBridgeおよびAmazon SNSを組み合わせて実現しています。 ※本記事ではGlueジョブ中の変換処理は扱わず、未加工データをそのまま連携するシンプルなデータパイプラインにフォーカスしています。 気にしたポイント 差分更新の設計 Glueの処理にフォーカスしたデータ連携の概要は、以下の通りです。 Glueジョブを動かすための要素として、RDS側で管理・履歴テーブル、S3側に各種ファイル配置先を設定しています。 本構成では、RDS(PostgreSQL)に格納された業務データを、AWS Glue(Python Shell)ジョブによりS3へ連携します。 差分更新を実現するため、RDS側にはジョブ実行日時を管理する「管理テーブル」と「履歴テーブル」を配置し、S3側にはデータ出力先に加えて、Glueで利用するソースコードやドライバ、ライブラリを格納するバケットを用意しています。 Glueジョブは実行時に管理テーブルを参照し、前回処理日時をもとに差分データを抽出します。抽出したデータはS3へ出力され、その後、処理結果を管理テーブルへ反映することで、次回実行時の差分基準として利用されます。 また、GLueジョブの実行結果は履歴テーブルにも記録されるように設計しています。具体的には、各実行ごとに「連携対象日時(FROM/TO)」「処理件数」「実行成否」などをRDSに記録することで、過去の実行状況を追跡可能としています。 さらに、実行結果はCloudWatch Logsにも出力し、個々のジョブ実行の詳細を確認できるようにしています。 参考:AWS Glue ジョブのログ記録 ジョブのパラメータ設定 Glueジョブでは、パラメータを指定することで、ジョブごとに処理対象や抽出条件を動的に制御することができます。 このパラメータ設計は、複数テーブルのジョブ実装対応を効率化する上で重要なポイントとなります。テーブルごとに個別のジョブを1から作成すると開発負荷が高くなるため、ジョブをパラメータ化し、共通のジョブ定義をベースとして複製することで、複数テーブルに対応できるようにしました。 (GlueジョブはClone機能により簡単に複製が可能です) 具体的には、以下のようなパラメータを定義しています。 table_name :対象テーブル名を指定し、スクリプト内で参照するテーブルを動的に切り替える primary_key :主キー項目を指定し、更新処理時のキー条件を動的に制御する last_update_column :差分抽出に利用する列を指定し、当該列をもとにフィルタ条件を動的に生成する これにより、ジョブ定義の共通化と再利用性を確保しつつ、テーブル数が増加した場合でも効率的に展開できる構成としています。 参考:AWS Glue の Python パラメータの受け渡しとアクセス PostgreSQL → Icebergのデータ型変換 出力先をIcebergテーブルとして利用する前提のため、データ型の整合性を意識した設計を行いました。 PostgreSQLとIcebergでは型の扱いに差異があるため、スキーマをもとに列ごとの型変換を動的に適用する方式としています。これにより、テーブルごとに個別対応することなく、共通ロジックで型整合性を担保できる構成としています。 また、今回のケースに限らず、システム間連携においては、各システムで扱うデータ型の仕様を正しく理解しておくことが重要です。特に、型の不一致による連携漏れや桁あふれなどの問題が発生しないよう、事前に設計段階で考慮しておく必要があります。 参考:Icebergのデータ型の仕様 ライブラリ管理 Glue Python Shellでは、処理内容に応じて必要なPythonライブラリを自前で準備する必要があります。 今回の構成では、PostgreSQL接続用にpg8000、S3 Tables / Iceberg操作用にpyicebergなどのライブラリを利用しており、これらをwhlファイルとしてS3に配置して使用しています。 これらのライブラリを事前に配置しておくことで、実行時に外部リポジトリへアクセスして取得する必要がなく、インターネット接続に依存しない安定した実行環境を構築することができます。 また、Glue Python Shellにはあらかじめ利用可能なライブラリが含まれている一方で、用途によっては追加でライブラリを用意する必要があります。そのため、利用するライブラリが事前インストール済かどうかを確認した上で、必要に応じて追加対応を行うことが重要です。 さらに、ライブラリによってはGlueの実行環境との互換性によりそのまま利用できない場合もあるため、事前の検証やビルド方法の調整が必要となる点には注意が必要です。 参考:AWS Glue Pythonジョブにおけるライブラリ管理 まとめ AWS Glue Python Shellは、起動時間が短く、コストを抑えながらPythonで柔軟に実装できる点が特徴であり、軽量なデータ連携基盤として非常に有効な選択肢です。特に、SQL主体のシンプルなETL処理や高頻度のバッチにおいては、その特性を活かしやすく、効率的なデータ連携を実現できました。 一方で、ライブラリの準備や実行環境との互換性、差分更新の設計などは自前で考慮する必要があり、構成によっては設計・運用面での工夫が求められます。また、ジョブのパラメータ設計やログ出力などを含めた運用設計も重要なポイントとなります。そのため、処理内容やデータ量、運用要件に応じて、Glue SparkやDMS、リアルタイム連携であればKinesis Data Firehose、またはTROCCOのようなサードパーティ製ETLツールも含めて、適切に使い分けることが重要だと思いました。 本記事のような軽量かつ柔軟なデータ連携のユースケースにおいては、Glue Python Shellは実用性の高い選択肢であると感じました。 (宣伝) クラウドデータ活用サービス 今回ご紹介した内容は、SCSKで提供しているクラウドデータ活用サービスの中で扱っているテーマの一部になります。 お客様のデータ活用状況に応じて、基盤構築から可視化、データ連携、データマネジメント、高度データ活用までを段階的にご支援しています! ご関心あれば、以下のサービスページもご参照ください。 クラウドデータ活用サービス|サービス|企業のDX戦略を加速するハイブリッドクラウドソリューション クラウドデータ活用サービスは、お客様のデータ活用状況に合わせ、適切なご支援が可能です。 www.scsk.jp
こんにちは。SCSKの岡尾です。 今回は、AWS Glueを利用したETL処理を実装していた中でハマったポイントを紹介したいと思います。 私自身、ETLの実装は初めてでした。これからGlueを使ったETL処理実装していこうとしている方が同じようにつまずかないようにハマりポイントをご紹介できればと思います。 目次 はじめに ハマりどころ ネットワーク:Glueセキュリティグループの「自己参照」 トランザクション:Commit Failed Exception PySpark:メモリ不足エラー まとめ 1. はじめに 今回のプロジェクトでは、Amazon RDS上の業務データをS3 Tablesで構築したデータレイクへ同期するパイプラインを構築しました。 システム構成を簡略化した図が以下の通りです。ポイントとしてはRDSはVPC内のプライベートサブネットに配置されているというところです。 一見シンプルな構成ですが、実際に構築してみると思わぬ落とし穴がありました。 2.1【ハマりどころ①】 ネットワーク:Glueセキュリティグループの「自己参照」 最初のハマりどころは、GlueでのRDSへの接続設定です。 VPC内にあるRDSへGlueから接続する場合、Glue Connection(接続情報)を作成し、VPC・サブネット・セキュリティグループ(SG)を指定する必要があります。 ここで、Glue特有の要件として 自己参照ルール というものがあります。Glueジョブは、内部的にドライバーとワーカーノード間で通信を行います。この通信はVPC内に作成されたENIを経由して行われます。 そのため、 Glueに割り当てたセキュリティグループ自身が、そのセキュリティグループからの全TCP通信を許可している 必要があります。 そのため、Glueにアタッチするセキュリティグループのインバウンドルールには以下を追加する必要があります。 タイプ : すべてのTCP ポート範囲 : 0 – 65535 ソース : カスタム(自分自身のセキュリティグループID sg-xxxxxx) この設定がないと、Glueジョブの実装ができないようです。 2.2 【ハマりどころ②】トランザクション:Commit Failed Exceptionエラー 続いてのハマりどころは、icebergテーブルの書き込み競合です。 今回の実装では、Glueジョブは連携するテーブルの数だけ作成し同時に複数のジョブが起動するような構成としていました。このとき、偶発的にCommitの競合を示すような以下のエラーが発生しました。 「pyiceberg.exceptions.CommitFailedException: CommitFailedException: Request doesn’t meet the requirement condition: Requirement failed: branch main has changed: expected id …..」 調べてみると、以下の公式ブログにもある通り、共通のカタログを利用していると異なるテーブルであってもCommitFailedException が発生する仕様となっているようでした。 Manage concurrent write conflicts in Apache Iceberg on the AWS Glue Data Catalog これを回避するためには、このエラーが発生した場合にリトライ処理を実施するような実装が必要でした。異なるテーブルであれば同時にジョブ実行しても問題ないと思っていましたが、カタログが共通だと書き込みの競合が発生してしまうんですね。 2.3 【ハマりどころ③】PySpark:メモリ不足エラー 最後のハマりどころは、Glueジョブの処理性能です。 最初はコスト効率のいい Python Shell でpythonのpyicebergライブラリを利用した実装をしていました。 しかし、データ量が増え、数万行レベルになった際に、データをDataFrameとしてメモリに展開しようとした際に落ちてしまうMemoryErrorが発生 しました。 Python Shellで利用したpyicebergライブラリのupsert処理では、一度に処理できるデータの件数に制約があるようです。 そこで、 Python Shellでの戦いを諦め、分散処理が可能な Glue ETL (Spark) へ切り替えました。これにより、Sparkの分散処理により数十万件のデータも一度にupsertできるようになりました。また、 Worker Typeの選定も柔軟になり、DPUサイズも調整することで安定してデータ連携が可能になりました。 最初は小さくPythonで、将来的なデータ増加に応じてSpark構成を検討するというのがいいのではないかなと感じました。 3. まとめ 今回は、RDSのデータをGlueジョブを使ってS3 Tablesへ連携する際のハマりどころを紹介しました。 Glueはサーバレスのサービスであるために便利な側面が多い反面、そこで利用される仕組みを理解した上での実装が必要になると勉強になりました。 皆様もGlueを使う際にはぜひ参考にしてみてください!
動画
該当するコンテンツが見つかりませんでした












