データ分析基盤グループでデータエンジニアをしている平川です。 DataVaultに関する記事の第2回目となります。(第1回の記事は こちら です) 第2回の記事は、DataVaultモデリングの中心となるHub/Link/Satelliteをdbtのパッケージを利用して作っていくという内容です。 2,3回目の内容が当初と少し変わっていますので、再掲いたします。 第1回: DataVaultってなに?どんな特徴があるの? 第2回: automate_dvを使ってDataVaultモデリングの中心となるテーブルを作ってみてわかったこと ← 今回はここ 第3回: BusinessVault、発展的なSatelliteテーブルやキーがNullだった場合の対処方法についてなど 前回のおさらい はじめに automate_dvについて automate_dvとは? 便利な点は何? 注意点 automate_dvの使い方 インストール方法 Hub/Link/Satelliteの実装 ハッシュキーの生成 Hubの生成 Linkの生成 Satelliteの生成 automate_dvを使う際の小技 automate_dvを使用していてハマったポイント 履歴化されているデータソースを取り込む場合 取り込みたいモデルの構造 対象のモデルのデータの変化 automate_dvを使ってSatelliteテーブルを作った際の結果 RawVault層までの実装にautomate_dvを使った感想など まとめと次回予告 参考資料 前回のおさらい 前回の記事では、データウェアハウス設計における1つのアプローチであるDataVaultの特徴やメリットについて説明しました。DataVaultは、柔軟性や拡張性に優れ、大量のデータを効果的に管理することができます。 また、DataVaultを実装するためには、Hub/Link/Satelliteという構造を持ったテーブルを生成する必要があります。これらのテーブルの生成には手間がかかることがありますが、automate_dvというパッケージを使うことでテーブル生成を簡易にすることができます。 今回の記事では、このテーブルの生成をサポートするautomate_dvの紹介と実際のテーブル生成までの手順を解説します。 はじめに 第2弾の記事では、データウェアハウスのテーブル構築方法について、automate_dv 1 を利用した手法について紹介します。 まずは、前回の内容を振り返りつつ、データウェアハウスの設計に関する用語を簡単に説明します。 用語 意味 ELT データウェアハウスにおけるデータの取り込み方法の1つで、データを抽出してから変換し、最後にロードする手法です dbt データウェアハウスの構築や管理をするためのオープンソースのツールです。SQL(一部Jinja)でデータパイプラインを記述できます。 DataVault データウェアハウスの設計パターンの1つで、拡張性や柔軟性の高さなどが特徴です。 Hub DataVaultで構築する上での中心となるテーブルで、ビジネスキーとそのハッシュキーで構成されます。 Link DataVaultでのHub同士の関係性を表すテーブルで、関連するHubのハッシュキーと関連する2つのHubのビジネスキーをconcatしてハッシュ化したキーで構成されます。 Satellite HubやLinkのキーに対して付随する属性情報を保持するテーブルです。属性情報をまとめてハッシュ化したHashdiffカラムによって、変化を検知することができます。 次のセクションでは、automate_dvを使用してDataVaultのテーブル構築を行う方法について、手順を解説します。 automate_dvについて automate_dvとは? dbtのパッケージの1つで、DataVault2.0モデルに基づいたデータウェアハウスの構築をサポートしてくれます。このパッケージのマクロを利用することで、DataVaultモデリングに関するテーブルのSQL実装を簡単に行うことができます。 便利な点は何? automate_dvを利用することで、Hub/Link/Satelliteテーブルやハッシュ化したキーを含むテーブルの作成を容易にすることができます。 これにより、SQLの記述量を大幅に削減し、テーブルの実装にかかる時間を短縮することができます。 注意点 automate_dvは便利なツールですが、利用しているプラットフォームによっては使用できないマクロがあるため注意が必要です。 例えば、一部のマクロはRedshiftやPostgreSQLでは利用できません。一方で、Snowflake/BigQuery/MS SQL Serverでは、現在一般提供されているマクロを使用することができます。 プラットフォームによって利用できる機能に違いがあるため、使用する前にドキュメントを参照することをおすすめします。 automate_dvの使い方 インストール方法 automate_dvを使うにはdbtを実行する環境にインストールする必要があります。dbtのプロジェクト直下にある。 package.yml に以下のようにautomate_dvパッケージの依存関係を追加し、 dpt deps を実行してください。 packages : - package : Datavault-UK/automate_dv version : 0.9.5 Hub/Link/Satelliteの実装 dbt run or dbt build を実行することで作成したSQLから各種ビューとテーブルを作成してくれます。 Hub/Link/Satelliteを作成するための各SQLの書き方を以下で説明していきます。 ハッシュキーの生成 Hub/Link/Satelliteを実装するにはそれぞれハッシュ化されたキーとhashdiffが必要になります。 そのため、実装する3テーブルが参照するビューも必要になるため、まずはハッシュキーを含むビューをautomate_dvを使って実装していきます。 実装例として、 orders , customers , products , customers_orders , orders_products という5つのソーステーブルからハッシュキーを含むビューを作成していきます。 hashed_ordersのSQLは下記のようになります。(hashed_orders以外のSQLについては折り畳んでありますので詳細を知りたい方は展開していただければと思います。) {%- set yaml_metadata -%} -- ① source_model: -- ② orders derived_columns: RECORD_SOURCE: " !ORDER_MANAGEMENT " hashed_columns: ORDER_HASH_DIFF: is_hashdiff: true columns: - ORDER_NAME - ORDER_AMOUNT - CREATED_AT ORDER_HK: - ORDER_ID {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.stage( include_source_columns= true , source_model=metadata_dict[ " source_model " ], derived_columns=metadata_dict[ " derived_columns " ], hashed_columns=metadata_dict[ " hashed_columns " ], null_columns=none, ranked_columns=none, ) }} -- ③ ① : jinjaの記述方法でyaml形式で各種パラメータを設定する ② : ハッシュキーの設定や参照するモデルの設定などをする ③ : automate_dvのstageマクロを使い各種パラメータを引数に設定する その他のモデルのクエリ hashed_products {%- set yaml_metadata -%} source_model: products derived_columns: RECORD_SOURCE: "!ORDER_MANAGEMENT" hashed_columns: HASH_DIFF: is_hashdiff: true columns: - PRODUCT_NAME - PRICE - CREATED_AT PRODUCT_HK: - PRODUCT_ID {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.stage( include_source_columns=true, source_model=metadata_dict["source_model"], derived_columns=metadata_dict["derived_columns"], hashed_columns=metadata_dict["hashed_columns"], null_columns=none, ranked_columns=none, ) }} hashed_customers {%- set yaml_metadata -%} source_model: customers derived_columns: RECORD_SOURCE: "!ORDER_MANAGEMENT" hashed_columns: HASH_DIFF: is_hashdiff: true columns: - EMAIL - PREFECTURE - CREATED_AT CUSTOMER_HK: - CUSTOMER_ID {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.stage( include_source_columns=true, source_model=metadata_dict["source_model"], derived_columns=metadata_dict["derived_columns"], hashed_columns=metadata_dict["hashed_columns"], null_columns=none, ranked_columns=none, ) }} hashed_customers_orders {%- set yaml_metadata -%} source_model: customers_orders derived_columns: RECORD_SOURCE: "!ORDER_MANAGEMENT" hashed_columns: HASH_DIFF: is_hashdiff: true columns: - CREATED_AT CUSTOMER_HK: - CUSTOMER_ID ORDER_HK: - ORDER_ID CUSTOMER_ORDER_HK: - CUSTOMER_ID - ORDER_ID {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.stage( include_source_columns=true, source_model=metadata_dict["source_model"], derived_columns=metadata_dict["derived_columns"], hashed_columns=metadata_dict["hashed_columns"], null_columns=none, ranked_columns=none, ) }} hashed_orders_products {%- set yaml_metadata -%} source_model: orders_products derived_columns: RECORD_SOURCE: "!ORDER_MANAGEMENT" hashed_columns: HASH_DIFF: is_hashdiff: true columns: - CREATED_AT ORDER_HK: - ORDER_ID PRODUCT_HK: - PRODUCT_ID ORDER_PRODUCT_HK: - ORDER_ID - PRODUCT_ID {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.stage( include_source_columns=true, source_model=metadata_dict["source_model"], derived_columns=metadata_dict["derived_columns"], hashed_columns=metadata_dict["hashed_columns"], null_columns=none, ranked_columns=none, ) }} 続いて、今作成したハッシュキー含むモデルから、Hub/Link/Satelliteを作成していきます。 Hubの生成 ordersのHubテーブルの作成は以下のSQLのようになります。(customersとproductsのHubテーブルは折りたたみ内にSQLを記載しています) {{ config(materialized= " incremental " ) }} {%- set yaml_metadata -%} source_model: hashed_orders src_pk: ORDER_HK src_nk: ORDER_ID src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.hub( src_pk=metadata_dict[ " src_pk " ], src_nk=metadata_dict[ " src_nk " ], src_ldts=metadata_dict[ " src_ldts " ], src_source=metadata_dict[ " src_source " ], source_model=metadata_dict[ " source_model " ], ) }} その他のモデルのクエリ hub_products {{ config(materialized="incremental") }} {%- set yaml_metadata -%} source_model: hashed_products src_pk: PRODUCT_HK src_nk: PRODUCT_ID src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.hub( src_pk=metadata_dict["src_pk"], src_nk=metadata_dict["src_nk"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }} hub_customers {{ config(materialized="incremental") }} {%- set yaml_metadata -%} source_model: hashed_customers src_pk: CUSTOMER_HK src_nk: CUSTOMER_ID src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.hub( src_pk=metadata_dict["src_pk"], src_nk=metadata_dict["src_nk"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }} Linkの生成 orderとproductの関係性を記述するLinkテーブルの作成は以下のSQLのようになります。(link_customers_ordersは折りたたみ内にSQLを記載しています。) {{ config(materialized= " incremental " ) }} {%- set yaml_metadata -%} source_model: hashed_orders_products src_pk: ORDER_PRODUCT_HK src_fk: - ORDER_HK - PRODUCT_HK src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv. link ( src_pk=metadata_dict[ " src_pk " ], src_fk=metadata_dict[ " src_fk " ], src_ldts=metadata_dict[ " src_ldts " ], src_source=metadata_dict[ " src_source " ], source_model=metadata_dict[ " source_model " ], ) }} その他のモデルのクエリ link_customers_orders {{ config(materialized="incremental") }} {%- set yaml_metadata -%} source_model: hashed_customers_orders src_pk: CUSTOMER_ORDER_HK src_fk: - CUSTOMER_HK - ORDER_HK src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.link( src_pk=metadata_dict["src_pk"], src_fk=metadata_dict["src_fk"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }} Satelliteの生成 orderの属性情報を記述するSatelliteテーブルの作成は以下のSQLのようになります。(sat_customersとsat_productsは折りたたみ内にSQLを記載しています。) {{ config(materialized= ' incremental ' ) }} {%- set yaml_metadata -%} source_model: hashed_orders src_pk: ORDER_HK src_hashdiff: HASH_DIFF src_payload: - ORDER_NAME - ORDER_AMOUNT - CREATED_AT src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.sat( src_pk=metadata_dict[ " src_pk " ], src_hashdiff=metadata_dict[ " src_hashdiff " ], src_payload=metadata_dict[ " src_payload " ], src_ldts=metadata_dict[ " src_ldts " ], src_source=metadata_dict[ " src_source " ], source_model=metadata_dict[ " source_model " ], ) }} その他のモデルのクエリ sat_customers {{ config(materialized='incremental') }} {%- set yaml_metadata -%} source_model: hashed_customers src_pk: CUSTOMER_HK src_hashdiff: HASH_DIFF src_payload: - EMAIL - PREFECTURE - CREATED_AT src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.sat( src_pk=metadata_dict["src_pk"], src_hashdiff=metadata_dict["src_hashdiff"], src_payload=metadata_dict["src_payload"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }} sat_products {{ config(materialized='incremental') }} {%- set yaml_metadata -%} source_model: hashed_products src_pk: PRODUCT_HK src_hashdiff: HASH_DIFFF src_payload: - PRODUCT_NAME - PRICE - CREATED_AT src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.sat( src_pk=metadata_dict["src_pk"], src_hashdiff=metadata_dict["src_hashdiff"], src_payload=metadata_dict["src_payload"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }} automate_dvを使う際の小技 automate_dvのドキュメントでは、stageマクロを実行する際のsource_modelには最新分だけを含んだモデルを対象にするのがベストプラクティスとされています。 ですが、止むを得ず参照するモデルに複数日付が含まれていることがあるかもしれません。この場合、最新分だけが存在する層を作るということもできますが、stageマクロを使いつつ、最新分だけのビューを作成することも可能です。 具体的な手順としては、 automate_dv.stage 部分をwith句に入れて一時テーブルとして定義し、日付を絞る方法です。以下のように記述することで実現できます -- パラメータの設定は省略 with stage as ( {{ automate_dv.stage( include_source_columns= true , source_model=metadata_dict[ " source_model " ], derived_columns=metadata_dict[ " derived_columns " ], hashed_columns=metadata_dict[ " hashed_columns " ], null_columns=none, ranked_columns=none, ) }} ) select * from stage where loaded_at = ' YYYY-MM-DD ' automate_dvを使用していてハマったポイント automate_dvは、データウェアハウスの構築において高いパフォーマンスを発揮するツールですが、日本語の情報も少なくハマってしまった時に原因を理解するのに時間がかかると思います。ここでは、実際にDataVault層のSatelliteテーブルを構築する際にハマったポイントについて説明していきます。 履歴化されているデータソースを取り込む場合 Satelliteテーブルに取り込む対象が、前日からの増分のみであれば、automate_dvのSatelliteマクロを使用しても問題ありません。しかし、Satelliteが参照するモデルが過去から現在までの全期間のレコードを持っている場合、予期しない結果になる可能性があります。具体的なサンプルと動作を見ていきましょう。 取り込みたいモデルの構造 カラム名 history_id customer_id amount created_at loaded_at 対象のモデルのデータの変化 2023-04-02 history_id customer_id amount created_at loaded_at 1 1 1000 2023-04-01 12:00:00 2023-04-02 00:00:00 2 2 1500 2023-04-01 13:00:00 2023-04-02 00:00:00 2023-04-03 history_id customer_id amount created_at loaded_at 1 1 1000 2023-04-01 12:00:00 2023-04-03 00:00:00 2 2 1500 2023-04-01 13:00:00 2023-04-03 00:00:00 3 1 1000 2023-04-02 12:00:00 2023-04-03 00:00:00 4 1 3000 2023-04-02 12:30:00 2023-04-03 00:00:00 2023-04-04 history_id customer_id amount created_at loaded_at 1 1 1000 2023-04-01 12:00:00 2023-04-04 00:00:00 2 2 1500 2023-04-01 13:00:00 2023-04-04 00:00:00 3 1 1000 2023-04-02 12:00:00 2023-04-04 00:00:00 4 1 3000 2023-04-02 12:30:00 2023-04-04 00:00:00 5 3 5000 2023-04-03 15:00:00 2023-04-04 00:00:00 6 1 1000 2023-04-03 16:00:00 2023-04-04 00:00:00 automate_dvを使ってSatelliteテーブルを作った際の結果 SatelliteテーブルのSelect結果 上記のSatelliteテーブルの赤い枠で囲まれた1行目と5行目は同一のレコードになっています。全期間の履歴を保持するようなモデルに対してSatelliteマクロを使うとこのような現象が起きてしまいます。 このような状態になってしまう原因は、automate_dvのマクロを使用することで生成されるSQLを見ることでわかります。 以下生成されたSQLを抜粋 WITH source_data AS ( SELECT a.CUSTOMER_HK, a.HASH_DIFF, a.AMOUNT, a.CREATED_AT, a.LOADED_AT, a.RECORD_SOURCE FROM hs_order_histories AS a WHERE a.CUSTOMER_HK IS NOT NULL ), latest_records AS ( SELECT a.CUSTOMER_HK, a.HASH_DIFF, a.LOADED_AT FROM ( SELECT current_records.CUSTOMER_HK, current_records.HASH_DIFF, current_records.LOADED_AT, RANK () OVER ( PARTITION BY current_records.CUSTOMER_HK ORDER BY current_records.LOADED_AT DESC ) AS rank FROM sat_order_histories AS current_records JOIN ( SELECT DISTINCT source_data.CUSTOMER_HK FROM source_data ) AS source_records ON current_records.CUSTOMER_HK = source_records.CUSTOMER_HK ) AS a WHERE a. rank = 1 ), records_to_insert AS ( SELECT DISTINCT stage.CUSTOMER_HK, stage.HASH_DIFF, stage.AMOUNT, stage.CREATED_AT, stage.LOADED_AT, stage.RECORD_SOURCE FROM source_data AS stage LEFT JOIN latest_records ON latest_records.CUSTOMER_HK = stage.CUSTOMER_HK AND latest_records.HASH_DIFF = stage.HASH_DIFF WHERE latest_records.HASH_DIFF IS NULL ) SELECT * FROM records_to_insert このSQLでは、ソースモデル(ハッシュキーを含むモデル)とSatelliteテーブルから条件を付与して抽出したテーブルを結合して、新たに追加するレコードを作成しています。 with句の2番目に定義されている最新レコード(latest_records)を見るとRankを使用して最新のレコードを抽出しています。 Satelliteテーブルに追加されるレコードは、ソースデータと最新レコードを結合し、ソースデータにだけ存在するレコードを抽出しています。 (where句での latest_records.HASH_DIFF IS NULL でソースデータにだけ存在するレコードを取ろうとしています。) 今回例として取り上げたような、同じハッシュキーに対して、異なるHASH_DIFFが追加されるようなテーブルの場合、Satelliteマクロは予期した動作をしないことがあります。 対応策としては、最初に述べたように、Satelliteテーブルが参照するモデルには最新分しか含まないようにすることです。中間層に手を加えたくない場合は、SQLでSatelliteテーブルに存在しないレコードのみを抽出するようなSQLを書きます。 select distinct customer_hk, hash_diff, amount, created_at, loaded_at, record_source from hs_order_history as stg where customer_hk is not null {% if is_incremental() %} and not exists ( select 1 from ( select customer_hk, hash_diff from {{ this }} as cur where stg.customer_hk = cur.customer_hk and stg.hash_diff = cur.hash_diff ) ) {% endif %} RawVault層までの実装にautomate_dvを使った感想など 1つのビジネスキーに対して、ログを溜め続けるようなデータソースがある場合、Satelliteマクロを使用する際に生成されるSQLを見ると、余計なレコードが生成されてしまうことがあります。そのため、データソースによってはマクロの使用の有無や参照モデルの修正など、使い分けや手を加える必要があり、少し面倒に感じました。 マクロを使用することで、キーのハッシュ化など、SQLで記述する場合には多くのコード量が必要な部分をマクロで置き換えられるのはすごく便利に感じました。 予期しない動作が発生した場合、コンパイルされたSQLを見ることで原因を理解することはできますが、automate_dvの実装を見るとマクロ内でマクロを呼び出しているためautomate_dvでの仕組みを理解するのには時間がかかる印象でした。 データソース側で物理削除されている場合、Satelliteではレコードが削除されたかどうかが判断できないので、StatusTrackingSatelliteが欲しくなる場面がありましたが、現在(2023年6月時点)はautomate_dvにマクロはまだ実装されていないのでSQLを直接書く必要がありました。(EffectiveSatelliteのマクロはあるのでもしかしたらそちらで対応できるかもしれません) まとめと次回予告 今回の記事ではDataVaultモデリングの中心となるHub/Link/Satelliteを生成するためのdbtのパッケージの1つであるautomate_dvについて紹介させていただきました。 automate_dvはSQLだけで、Hub/Link/Satelliteの生成クエリを実装するのに比べて、非常に少ない記述量で各種テーブルの実装を行うことができます。 当たり前ですが、マクロによって生成されるSQLがどんなことをしているのかを理解するのが、使いこなす上で重要になってくると思います。 次回は、BusinessVaultや今回の記事で名前だけ出てきているEffectiveSatelliteやStatusTrackingSatelliteの紹介や、ビジネスキーがNullだった場合の対応方法などについて紹介させていただければと思います。 参考資料 automate_dvドキュメントページ 記事を書き始めた時はdbtvaultでしたが、気づいたらautomate_dvに変わってました... ↩