はじめに こんにちは、XI本部エンジニアリングテクノロ ジー センターの徳山広士です。 この記事では、データ基盤/データ分析基盤におけるデータパイプラインのAI駆動開発の手法を提案します。 「 ODCS (Open Data Contract Standard) 」フォーマットの"Data Contract"を仕様書として生成AIへ与えてデータパイプラインのコードを生成させる手法です。 Data Contractは、データの仕様(テーブル構造、カラム定義、データ型など)と品質要件を YAML 形式で定義する標準フォーマットです。 当記事の主な内容は、Data Contractやローカル開発環境などの基礎的な説明から当件のAI駆動開発方法、コード生成に成功した検証結果についてです。 想定読者: データエンジニア、データマネジメント業務担当者 当記事で得られる知見: データパイプラインのAI駆動開発の勘所 前提知識: データパイプライン(ETL/ ELT 処理)の基礎知識 一般的に労力と時間のかかるデータマネジメント・サイクルをAI駆動開発によって加速できればと考えて、データエンジニアリングのAI駆動開発手法を探求し、この記事にまとめました。 データパイプラインのAI駆動開発実現の課題 生成AIに何を伝えるべきか アプリケーションのAI駆動開発においては、 ドメイン や ユースケース 、機能、制約などを生成AIに説明することでコード生成するアプローチを見聞きします。では、 データエンジニアリングにおいては何を説明すれば良いのか?どのように指示を出せば包括的に開発してくれるのか? これが最初に直面した課題でした。 自然言語 のプロンプトで目的の SQL クエリについて説明する方法も考えましたが、複雑なロジックの表現が困難であり、細かい要件が伝わらず、プロンプトの再現性・再利用性にも疑問がありました。 アプローチ: データ アーキテクチャ とデータモデルを伝える この課題に対して、データエンジニアリングの観点から「データ アーキテクチャ 」と「データモデル」の2つの軸で情報を整理し、データ基盤/データ分析基盤の全体像と開発方法を生成AIに伝えるアプローチを採用しました。 データ アーキテクチャ の説明 生成AIが全体像を理解するための情報: データレイヤー構造と各レイヤーの責務: raw, staging, core, martなど データストアの構成: データレイク, データウェアハウス, データマートなど 各データストアのデータフォルダ構成: "raw/システム名/YYYYMMDD"など データ処理アプローチ: ELT など インフラストラクチャ構成: 各データストアの物理実装で利用の SaaS など データパイプライン構成技術: dagster , dbt core など これらの内容を プロジェクトドキュメント(DESIGN.md等) として整備しました。 データモデルの説明 生成AIがデータパイプラインで実現すべき対象の情報: データの構成: テーブル, カラム, リレーションシップなど データ仕様: データ型, 主キー等の制約, 変換ロジックなど データ品質仕様: 満たすべき基準, テストルールなど これらを Data Contract(ODCS形式) としてドキュメント整備しました。 この2つを組み合わせることで、生成AIはデータパイプラインの実行環境全体の設計意図を理解し、データ基盤/データ分析基盤に適した具体的な実装を行えるようになりました。 データパイプライン作成依頼 生成AIへの開発依頼は、データパイプラインの要件を ADR (Architecture Decision Record) に記述し、その ADR で定義されたデータパイプラインを関連するData Contractにもとづいて開発するように生成AIへ依頼する方法を採用しました。 ADR に少なくとも以下の内容を書くようにしました。 データパイプラインの作成対象のデータ名 データ提供の背景や目的, データ利用の ユースケース ADR の記載内容の参考イメージ タイトル: P001 ECビジネスデータパイプライン構造 ステータス: Accepted コンテキスト: Eコマースビジネスパイプラインのデータ統合対象のデータを決める必要がある。 決定事項: Eコマースビジネスパイプラインは、web_salesとweb_returnsを対象とした構成にする。 検証内容の詳細 実際の検証環境や要件、作成対象のデータなどを説明します。 検証では TPC-DS (*1)のデータを使用し、生成AIには Claude Code(Sonnet 4.5, Anthropic) を使用しました。 web_salesテーブルの更新用のソースデータを変換してweb_salesへ統合するデータパイプラインの SQL クエリを生成します。 ※1 TPC -DS は Transaction Processing Performance Council ( TPC ) の商標です。 本記事は TPC ベンチマーク 結果の公表を目的としたものではありません。 ローカル開発環境 データパイプラインのデプロイ先は AWS やAzure、 Google Cloudなどの SaaS ですが、AI駆動開発を効率化するためにデプロイ先を模したローカル完結型の開発環境を整備しました。 データレイク兼データウェアハウス: DuckDB データ モデリング : dbt core オーケストレータ: dagster データ品質テスト: Soda Core DuckDBで構成 : SaaS への外部通信を排除し、生成AIの生産性を最大化。 SaaS で巨大になりがちなデータレイクやデータウェアハウスをコンパクトなDuckDBで代用しました。 コードベースのツール選定 : 生成AIがコードを直接参照・実装できるよう、全てコードベースのツールを採用。 GUI のETLツールは、生成AIとの連携で独自 SDK / API 経由が必要となり生産性の低下が懸念されるため使用しませんでした。 ツールによるインフラストラクチャの抽象化 : デプロイ先の SaaS とは物理的な環境構成が異なりますが、dbt coreなどのソフトウェアが抽象化し環境差分を吸収してくれます。 データ アーキテクチャ 概要 データパイプラインのデプロイ先のデータ分析基盤のデータ アーキテクチャ を図にしたものです。論理構成の主要部分のみを抜粋し、簡潔な内容を掲載しております。 データレイヤー: "raw", "staging", "core", "mart" の4層 データストア: データレイクとデータウェアハウス データ処理アプローチ: ELT データパイプライン構成技術: Dagster, dbt core, Soda Core インフラストラクチャ: AWS とAzure, Google Cloudの3大 クラウド サービスでそれぞれ実施、詳細は割愛 データモデル web sales ※一部のカラムは割愛 ディメンショナル モデリング されたスター スキーマ 構造となっており、複数の サロゲートキー を持っています。データ構造は把握しやすいもののETL/ ELT 処理の観点では10個以上のテーブル結合や中間テーブルを介したテーブル結合、一部カラムの演算処理が必要な複雑な構造になっています。 作成対象のデータパイプライン rawレイヤー 生データのデータファイルが外部テーブル (External table) としてデータウェアハウスに既にテーブル化されており、そのテーブルを参照すれば良いため、処理の実装は不要。 stagingレイヤー rawレイヤーのデータをもとにマスターデータとの結合によるデータ取得や演算処理などの ビジネスロジック をdbtの SQL クエリモデルとして実装 SELECT対象のカラム数: 35個 テーブル結合数: 12個 coreレイヤー stagingレイヤーのデータをもとに増分処理でデータ更新する処理をdbtの SQL クエリモデルとして実装 SELECT対象のカラム数: 38個 テーブル結合数: 0個 実装ルール SELECT文でカラムの型キャストを明記 SELECT文でカラムの名称をAS文で指定 CTE (Common Table Expression) でソースデータ定義 coreレイヤーでは増分処理を実装 生成AIへの依頼 前述の「 アプローチ: データアーキテクチャとデータモデルを伝える 」に記載のドキュメントに加えて、開発 ガイドライン も整備し開発依頼を行いました。 生成AIへ提供した情報 プロジェクトドキュメント Data Contract 開発 ガイドライン (開発規約や トラブルシューティング など) ADR (Architecture Decision Record) プロンプト 実際のプロンプト 以下のファイルを読み込んで、当プロジェクトについて理解してください。 - ./docs/DESIGN.md - ./docs/DATAMODEL_GENERATION_GUIDELINES.md ADRで定義されたデータパイプラインを関連する`Data Contract`にもとづいて作成します。 今回は、ADRの"./docs/adr/pipelines/P001-ecommerce-business-pipeline-structure.md"で決定したデータパイプラインを作成してください。 実際のData Contractのサンプル "ws_item_sk"という主要なカラムのData Contractを一部抜粋したものです。より詳しい内容と解説は 詳細解説 にて後述します。 - name : ws_item_sk businessName : ウェブ販売商品サロゲートキー logicalType : string physicalType : STRING primaryKey : true primaryKeyPosition : 1 transformSourceObjects : - core_tpcds.item.i_item_sk transformLogic : SELECT i_item_sk FROM s_web_order JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL ) relationships : - type : foreignKey to : - core_tpcds.item.i_item_sk quality : - type : library rule : duplicateCount name : "主キー重複チェック(ws_item_sk)" dimension : uniqueness mustBe : 0 severity : error businessImpact : "重複レコードによりファクトテーブルのデータ整合性が崩れ、売上集計に誤差が生じる" 生成されたコード stagingレイヤー 12テーブルを結合し、35カラムを生成する SQL クエリです。Data Contractの transformLogic で定義した変換ロジックが正確に反映されています。 with s_web_order as ( select * from {{ source( ' raw_tpcds ' , ' s_web_order_1 ' ) }} ), s_web_order_lineitem as ( select * from {{ source( ' raw_tpcds ' , ' s_web_order_lineitem_1 ' ) }} ) SELECT d1.d_date_sk::STRING AS ws_sold_date_sk, t_time_sk::STRING AS ws_sold_time_sk, d2.d_date_sk::STRING AS ws_ship_date_sk, i_item_sk::STRING AS ws_item_sk, c1.c_customer_sk::STRING AS ws_bill_customer_sk, c1.c_current_cdemo_sk::STRING AS ws_bill_cdemo_sk, c1.c_current_hdemo_sk::STRING AS ws_bill_hdemo_sk, c1.c_current_addr_sk::STRING AS ws_bill_addr_sk, c2.c_customer_sk::STRING AS ws_ship_customer_sk, c2.c_current_cdemo_sk::STRING AS ws_ship_cdemo_sk, c2.c_current_hdemo_sk::STRING AS ws_ship_hdemo_sk, c2.c_current_addr_sk::STRING AS ws_ship_addr_sk, wp_web_page_sk::STRING AS ws_web_page_sk, web_site_sk::STRING AS ws_web_site_sk, sm_ship_mode_sk::STRING AS ws_ship_mode_sk, w_warehouse_sk::STRING AS ws_warehouse_sk, p_promo_sk::STRING AS ws_promo_sk, word_order_id::STRING AS ws_order_number, wlin_quantity:: INTEGER AS ws_quantity, i_wholesale_cost:: DECIMAL ( 7 , 2 ) AS ws_wholesale_cost, i_current_price:: DECIMAL ( 7 , 2 ) AS ws_list_price, wlin_sales_price:: DECIMAL ( 7 , 2 ) AS ws_sales_price, ((i_current_price:: DECIMAL ( 7 , 2 ) - wlin_sales_price:: DECIMAL ( 7 , 2 )) * wlin_quantity:: INTEGER ):: DECIMAL ( 7 , 2 ) AS ws_ext_discount_amt, (wlin_sales_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ):: DECIMAL ( 7 , 2 ) AS ws_ext_sales_price, (i_wholesale_cost:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ):: DECIMAL ( 7 , 2 ) AS ws_ext_wholesale_cost, (i_current_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ):: DECIMAL ( 7 , 2 ) AS ws_ext_list_price, (i_current_price:: DECIMAL ( 7 , 2 ) * web_tax_percentage:: DECIMAL ( 7 , 2 )):: DECIMAL ( 7 , 2 ) AS ws_ext_tax, wlin_coupon_amt:: DECIMAL ( 7 , 2 ) AS ws_coupon_amt, (wlin_ship_cost:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ):: DECIMAL ( 7 , 2 ) AS ws_ext_ship_cost, (wlin_sales_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER - wlin_coupon_amt:: DECIMAL ( 7 , 2 )):: DECIMAL ( 7 , 2 ) AS ws_net_paid, (((wlin_sales_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ) - wlin_coupon_amt:: DECIMAL ( 7 , 2 )) * ( 1 + web_tax_percentage:: DECIMAL ( 7 , 2 ))):: DECIMAL ( 7 , 2 ) AS ws_net_paid_inc_tax, (((wlin_sales_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ) - wlin_coupon_amt:: DECIMAL ( 7 , 2 )) - (wlin_quantity:: INTEGER * i_wholesale_cost:: DECIMAL ( 7 , 2 ))):: DECIMAL ( 7 , 2 ) AS ws_net_paid_inc_ship, ((wlin_sales_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ) - wlin_coupon_amt:: DECIMAL ( 7 , 2 ) + (wlin_ship_cost:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ) + i_current_price:: DECIMAL ( 7 , 2 ) * web_tax_percentage:: DECIMAL ( 7 , 2 )):: DECIMAL ( 7 , 2 ) AS ws_net_paid_inc_ship_tax, (((wlin_sales_price:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER ) - wlin_coupon_amt:: DECIMAL ( 7 , 2 )) - (i_wholesale_cost:: DECIMAL ( 7 , 2 ) * wlin_quantity:: INTEGER )):: DECIMAL ( 7 , 2 ) AS ws_net_profit, s_web_order.processing_date::STRING AS processing_date, current_timestamp :: TIMESTAMP AS ingested_at FROM s_web_order LEFT JOIN {{ ref( ' date_dim ' ) }} d1 ON ( cast (word_order_date AS date ) = d1.d_date) LEFT JOIN {{ ref( ' time_dim ' ) }} ON (word_order_time = t_time) LEFT JOIN {{ ref( ' customer ' ) }} c1 ON (word_bill_customer_id = c1.c_customer_id) LEFT JOIN {{ ref( ' customer ' ) }} c2 ON (word_ship_customer_id = c2.c_customer_id) LEFT JOIN {{ ref( ' web_site ' ) }} ON (word_web_site_id = web_site_id AND web_rec_end_date IS NULL ) LEFT JOIN {{ ref( ' ship_mode ' ) }} ON (word_ship_mode_id = sm_ship_mode_id) JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN {{ ref( ' date_dim ' ) }} d2 ON ( cast (wlin_ship_date AS date ) = d2.d_date) LEFT JOIN {{ ref( ' item ' ) }} ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL ) LEFT JOIN {{ ref( ' web_page ' ) }} ON (wlin_web_page_id = wp_web_page_id AND wp_rec_end_date IS NULL ) LEFT JOIN {{ ref( ' warehouse ' ) }} ON (wlin_warehouse_id = w_warehouse_id) LEFT JOIN {{ ref( ' promotion ' ) }} ON (wlin_promotion_id = p_promo_id) coreレイヤー stagingレイヤーのデータを増分処理でcoreレイヤーに取り込む SQL クエリです。dbtのincrementalモデルを使用しています。 {{ config( materialized= ' incremental ' , unique_key=[ ' ws_item_sk ' , ' ws_order_number ' ], on_schema_change= ' fail ' , incremental_strategy= ' merge ' if target. type != ' duckdb ' else ' delete+insert ' ) }} -- Incrementalモデル:staging層からデータを取得 select ws_sold_date_sk::STRING as ws_sold_date_sk, ws_sold_time_sk::STRING as ws_sold_time_sk, ws_ship_date_sk::STRING as ws_ship_date_sk, ws_item_sk::STRING as ws_item_sk, ws_bill_customer_sk::STRING as ws_bill_customer_sk, ws_bill_cdemo_sk::STRING as ws_bill_cdemo_sk, ws_bill_hdemo_sk::STRING as ws_bill_hdemo_sk, ws_bill_addr_sk::STRING as ws_bill_addr_sk, ws_ship_customer_sk::STRING as ws_ship_customer_sk, ws_ship_cdemo_sk::STRING as ws_ship_cdemo_sk, ws_ship_hdemo_sk::STRING as ws_ship_hdemo_sk, ws_ship_addr_sk::STRING as ws_ship_addr_sk, ws_web_page_sk::STRING as ws_web_page_sk, ws_web_site_sk::STRING as ws_web_site_sk, ws_ship_mode_sk::STRING as ws_ship_mode_sk, ws_warehouse_sk::STRING as ws_warehouse_sk, ws_promo_sk::STRING as ws_promo_sk, ws_order_number::STRING as ws_order_number, ws_quantity:: INTEGER as ws_quantity, ws_wholesale_cost:: DECIMAL ( 7 , 2 ) as ws_wholesale_cost, ws_list_price:: DECIMAL ( 7 , 2 ) as ws_list_price, ws_sales_price:: DECIMAL ( 7 , 2 ) as ws_sales_price, ws_ext_discount_amt:: DECIMAL ( 7 , 2 ) as ws_ext_discount_amt, ws_ext_sales_price:: DECIMAL ( 7 , 2 ) as ws_ext_sales_price, ws_ext_wholesale_cost:: DECIMAL ( 7 , 2 ) as ws_ext_wholesale_cost, ws_ext_list_price:: DECIMAL ( 7 , 2 ) as ws_ext_list_price, ws_ext_tax:: DECIMAL ( 7 , 2 ) as ws_ext_tax, ws_coupon_amt:: DECIMAL ( 7 , 2 ) as ws_coupon_amt, ws_ext_ship_cost:: DECIMAL ( 7 , 2 ) as ws_ext_ship_cost, ws_net_paid:: DECIMAL ( 7 , 2 ) as ws_net_paid, ws_net_paid_inc_tax:: DECIMAL ( 7 , 2 ) as ws_net_paid_inc_tax, ws_net_paid_inc_ship:: DECIMAL ( 7 , 2 ) as ws_net_paid_inc_ship, ws_net_paid_inc_ship_tax:: DECIMAL ( 7 , 2 ) as ws_net_paid_inc_ship_tax, ws_net_profit:: DECIMAL ( 7 , 2 ) as ws_net_profit, processing_date::STRING as processing_date, ingested_at:: TIMESTAMP as ingested_at, current_timestamp :: TIMESTAMP as inserted_at, current_timestamp :: TIMESTAMP as updated_at from {{ ref( ' web_sales_view ' ) }} {% if is_incremental() %} -- incrementalの場合、processing_dateでフィルタリング where processing_date = ' {{ var("processing_date", "20240101") }} ' {% endif %} 詳細解説 Data Contractとは? Data Contractは、データの提供者と利用者の間でデータ仕様やデータ品質要件を合意するための標準フォーマットです。 フォーマットの既定の項目を使ってデータ仕様やデータ品質要件を定義することができます。 既定の項目がサポートしているものは、テーブル スキーマ や各種の論理名、データの粒度などの メタデータ に加えて、サンプルデータ、データ品質ルールなど多岐に渡ります。 当検証では「 ODCS (Open Data Contract Standard) 」という YAML 形式のフォーマットを使用します。ODCSは Linux Foundation傘下のBitolプロジェクトがサポートする代表的な仕様です。 当記事では、Data Contractを生成AIへ与えるコンテキストとして活用します。Data Contractの作成自体は設計プロセスで生成AIに行わせることも可能です(別記事で説明予定)。 今回使用のData Contractの主要項目 主に使用した項目と各項目の簡単な説明を記載します。 name: カラム物理名 businessName: カラム論理名 logicalType: 論理データ型 physicalType: 物理データ型 transformSourceObjects: カラムのデータを作成する際に必要なソースデータに関する情報 transformLogic: カラムのデータを作成する際に必要なデータ変換ロジック relationships: カラムのリレーションシップ情報 quality: カラムのデータ品質要件 実際のData Contract 以下、代表的な2つのカラムのData Contractを抜粋します。 主キー兼外部キーの例: ws_item_sk こちらは前述のData Contractの詳述です。 複合主キーの一部であり外部キーでもあるカラムです。3テーブルの結合と、3種類のデータ品質テストが定義されています。 - name : ws_item_sk businessName : ウェブ販売商品サロゲートキー logicalType : string physicalType : STRING primaryKey : true primaryKeyPosition : 1 transformSourceObjects : - core_tpcds.item.i_item_sk transformLogic : SELECT i_item_sk FROM s_web_order JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL ) relationships : - type : foreignKey to : - core_tpcds.item.i_item_sk customProperties : - target_contract_id : "core_tpcds.item" - target_contract_path : "./core_tpcds/item.yaml" - target_property : "i_item_sk" quality : - type : library rule : duplicateCount name : "主キー重複チェック(ws_item_sk)" dimension : uniqueness mustBe : 0 severity : error businessImpact : "重複レコードによりファクトテーブルのデータ整合性が崩れ、売上集計に誤差が生じる" - type : library rule : nullCount name : "主キーNULLチェック(ws_item_sk)" dimension : completeness mustBe : 0 severity : error businessImpact : "NULL値が存在すると商品別売上分析が不可能になる" - type : sql name : "商品マスタ参照整合性チェック" dimension : consistency query : | SELECT COUNT(*) FROM ${object} ws LEFT JOIN item i ON ws.ws_item_sk = i.i_item_sk WHERE ws.ws_item_sk IS NOT NULL AND i.i_item_sk IS NULL mustBe : 0 severity : error businessImpact : "参照整合性が崩れると商品マスタとの結合で欠損が発生し、商品情報を取得できない売上データが生じる" 複雑な計算ロジックの例: ws_net_profit 純利益は複数のソースから計算され、計算整合性と平均値の妥当性の両方がチェックされます。 - name : ws_net_profit businessName : ウェブ純利益 logicalType : number physicalType : DECIMAL(7, 2) transformSourceObjects : - raw.s_web_order_lineitem.wlin_sales_price - raw.s_web_order_lineitem.wlin_quantity - raw.s_web_order_lineitem.wlin_coupon_amt - core_tpcds.item.i_wholesale_cost transformLogic : SELECT (((wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER) - wlin_coupon_amt::DECIMAL(7,2)) - (i_wholesale_cost::DECIMAL(7,2) * wlin_quantity::INTEGER))::DECIMAL(7,2) FROM s_web_order JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL ) quality : - type : sql name : "純利益計算整合性チェック" dimension : accuracy query : | SELECT COUNT(*) FROM ${object} WHERE ws_net_profit IS NOT NULL AND ws_net_paid IS NOT NULL AND ws_ext_wholesale_cost IS NOT NULL AND ABS(ws_net_profit - (ws_net_paid - ws_ext_wholesale_cost)) > 0.01 mustBe : 0 severity : error businessImpact : "計算式が不正確な場合、利益分析の信頼性を損なう" - type : sql name : "純利益平均値チェック" dimension : accuracy query : | SELECT AVG(ws_net_profit) FROM ${object} WHERE ws_net_profit IS NOT NULL mustBeBetween : [ -100, 1000 ] severity : warning businessImpact : "平均利益が異常値の場合、価格設定や原価データに問題がある可能性" transformSourceObjectsとtransformLogicの重要性 当検証で特に試行錯誤したのが、この2つのプロパティの記述でした。 当初の仮説 : 各プロパティでデータ仕様を記述し、relationshipsプロパティでデータ間の関係性を記述すれば、生成AIがJOIN処理やカラム毎の計算ロジック、変換ロジックを類推できるのではないか 実際の課題 : データ仕様やリレーションシップ(データモデル上の関係性)とETL/ ELT 処理(データ変換ロジック)の間には隔たりがある 例えば、 ws_item_sk カラムは item テーブルと外部キー関係にありますが、実際のデータ取得には s_web_order → s_web_order_lineitem → item という3テーブルの結合が必要です。 これは生成AIに分かってもらえそうで分かってもらえませんでした。 s_web_order と item の2つのテーブルを無理に結合しようとしてしまいますし、チャットで説明して改善しても後で忘れて問題が再発し、忘れないように記録してもらっても個別のケースの実装方法として記録されて他ケースへの応用が懸念されました。 他に、生成AIは サロゲートキー どうしでテーブル結合したり、主キーで結合するように指示して改善しても今度は肝心の サロゲートキー を取得してくれなかったりしました。 また、カラムの演算処理は間違った内容が実装されるか全く実装されないかでした。 そのため、以下の2つのプロパティを使うようにしました: transformSourceObjects : データの出所(ソーステーブル・カラム)を明示 transformLogic : 具体的な変換ロジック(JOIN条件、フィルタ、計算式など)を記述 これにより、生成AIは「何を参照して、どう変換するか」を正確に理解し、意図通りの SQL を生成できるようになりました。 transformSourceObjectsとtransformLogicの作成方法 transformSourceObjectsプロパティとtransformLogicプロパティの作成自体をデータモデルの設計過程で生成AIと共に行います。 もし、データソース側のシステムで業務知識 (例. 純利益の計算式) が何かドキュメントなどに整理されてあれば、その内容をもとに生成AIが両プロパティを定義できる可能性があります。 また、例えばディメンショナル モデリング であれば、データ粒度 (Grain) の定義やConformance Matrixなどを機械判読可能なファイル形式で生成AIと共に作成し、それら成果物をソースとして生成AIが両プロパティを作成できる可能性があります。 relationshipsプロパティは ODCS v3.1.0 にリリース予定 ODCSの現在の最新公式バージョンはv3.0.2ですが、このバージョンには relationshipsプロパティが存在しません 。 ODCS v3.1.0でrelationshipsプロパティが実装される予定で、当検証ではODCSの v3.1.0 を先行的に使用しています。 v3.1.0での追加予定: ODCS v3.1.0 RFC0013 その他のプロパティの作成方法 nameプロパティやphysicalTypeプロパティなどの基礎的なプロパティの作成は、データソースのDBの スキーマ 情報や生データのデータファイルをソースとして生成AIに与えることで半自動生成もしくは自動生成が可能です。 Excel などの固有ソフトウェアのファイル形式の場合は、機械判読可能なオープンなファイル形式への変換が必要です。 さいごに 検証の振り返り 本記事では、Data Contract(ODCS形式)を活用したデータパイプラインのAI駆動開発手法を検証しました。 成果として得られたこと : 12テーブル結合、35カラムの複雑な SQL クエリを生成AIが正確に生成 Data Contractに定義した型キャスト、計算ロジック、JOIN条件が意図通りに実装 増分処理やdbtのincremental設定も適切に生成 成功要因 : データ アーキテクチャ とデータモデルの2軸でコンテキストを整理 データ項目やデータ型、制約などの スキーマ 情報に加えて、 transformSourceObjects と transformLogic でデータリネージを明示 ローカル完結型の開発環境で生成AIの試行錯誤を高速化 所感 データエンジニアリングのAI駆動開発に向けたドキュメント整備方法の1つを見出すことができて良かったと思います。 データ基盤/データ分析基盤を概念的に分解し、既存の専門用語に当てはめて構造化して説明することで生成AIが体系的かつ詳細に理解するようになることが実感できました。 また、特定の製品・サービスに依存せずに進められましたので、再利用性が比較的に高いと考えており、今後の応用として関連の製品・サービスと連携しDataOpsやAIOpsへ昇華できればと期待しています。 データマネジメントのより広い範囲への活用の可能性 本記事ではデータパイプラインのコード生成に焦点を当てましたが、Data Contract自体の生成やデータ品質テストの自動生成など、より広い範囲への活用も考えられます。実際にData Contractの quality プロパティを応用したデータ品質テストコードの生成を検証しており、その内容は別途記事化の予定です。 また、データの意味的な定義やデータ統合に関する マッピング 情報などを整備してData Contractを拡張することで、例えばデータマネジメント全般における機械判読可能なドキュメントとしての活用やAI伴走型の分析データモデル開発への応用も検討しています。 データエンジニアとして、データマネジメントにおけるData Contractと生成AIの活用可能性については、今後も継続的に検証を進めていく予定です。 私たちは一緒に働いてくれる仲間を募集しています! 電通総研 キャリア採用サイト 電通総研 新卒採用サイト 執筆: @shikarashika レビュー: @yamada.y ( Shodo で執筆されました )