Safie Engineers' Blog!

Safieのエンジニアが書くブログです

dbtvault改めautomate_dvで実装するDataVaultモデリング

​データ分析基盤グループでデータエンジニアをしている平川です。
DataVaultに関する記事の第2回目となります。(第1回の記事はこちらです)
第2回の記事は、DataVaultモデリングの中心となるHub/Link/Satelliteをdbtのパッケージを利用して作っていくという内容です。

2,3回目の内容が当初と少し変わっていますので、再掲いたします。

第1回: DataVaultってなに?どんな特徴があるの?
第2回: automate_dvを使ってDataVaultモデリングの中心となるテーブルを作ってみてわかったこと ← 今回はここ
第3回: BusinessVault、発展的なSatelliteテーブルやキーがNullだった場合の対処方法についてなど

前回のおさらい

前回の記事では、データウェアハウス設計における1つのアプローチであるDataVaultの特徴やメリットについて説明しました。DataVaultは、柔軟性や拡張性に優れ、大量のデータを効果的に管理することができます。
また、DataVaultを実装するためには、Hub/Link/Satelliteという構造を持ったテーブルを生成する必要があります。これらのテーブルの生成には手間がかかることがありますが、automate_dvというパッケージを使うことでテーブル生成を簡易にすることができます。
今回の記事では、このテーブルの生成をサポートするautomate_dvの紹介と実際のテーブル生成までの手順を解説します。

はじめに

第2弾の記事では、データウェアハウスのテーブル構築方法について、automate_dv1を利用した手法について紹介します。
まずは、前回の内容を振り返りつつ、データウェアハウスの設計に関する用語を簡単に説明します。

用語 意味
ELT データウェアハウスにおけるデータの取り込み方法の1つで、データを抽出してから変換し、最後にロードする手法です
dbt データウェアハウスの構築や管理をするためのオープンソースのツールです。SQL(一部Jinja)でデータパイプラインを記述できます。
DataVault データウェアハウスの設計パターンの1つで、拡張性や柔軟性の高さなどが特徴です。
Hub DataVaultで構築する上での中心となるテーブルで、ビジネスキーとそのハッシュキーで構成されます。
Link DataVaultでのHub同士の関係性を表すテーブルで、関連するHubのハッシュキーと関連する2つのHubのビジネスキーをconcatしてハッシュ化したキーで構成されます。
Satellite HubやLinkのキーに対して付随する属性情報を保持するテーブルです。属性情報をまとめてハッシュ化したHashdiffカラムによって、変化を検知することができます。

次のセクションでは、automate_dvを使用してDataVaultのテーブル構築を行う方法について、手順を解説します。

automate_dvについて

automate_dvとは?

dbtのパッケージの1つで、DataVault2.0モデルに基づいたデータウェアハウスの構築をサポートしてくれます。このパッケージのマクロを利用することで、DataVaultモデリングに関するテーブルのSQL実装を簡単に行うことができます。

便利な点は何?

automate_dvを利用することで、Hub/Link/Satelliteテーブルやハッシュ化したキーを含むテーブルの作成を容易にすることができます。 これにより、SQLの記述量を大幅に削減し、テーブルの実装にかかる時間を短縮することができます。

注意点

automate_dvは便利なツールですが、利用しているプラットフォームによっては使用できないマクロがあるため注意が必要です。
例えば、一部のマクロはRedshiftやPostgreSQLでは利用できません。一方で、Snowflake/BigQuery/MS SQL Serverでは、現在一般提供されているマクロを使用することができます。
プラットフォームによって利用できる機能に違いがあるため、使用する前にドキュメントを参照することをおすすめします。

automate_dvの使い方

インストール方法

automate_dvを使うにはdbtを実行する環境にインストールする必要があります。dbtのプロジェクト直下にある。package.yml に以下のようにautomate_dvパッケージの依存関係を追加し、dpt depsを実行してください。

packages:
  - package: Datavault-UK/automate_dv
    version: 0.9.5

Hub/Link/Satelliteの実装

dbt run or dbt buildを実行することで作成したSQLから各種ビューとテーブルを作成してくれます。 Hub/Link/Satelliteを作成するための各SQLの書き方を以下で説明していきます。

ハッシュキーの生成

Hub/Link/Satelliteを実装するにはそれぞれハッシュ化されたキーとhashdiffが必要になります。 そのため、実装する3テーブルが参照するビューも必要になるため、まずはハッシュキーを含むビューをautomate_dvを使って実装していきます。
実装例として、orders, customers, products, customers_orders, orders_productsという5つのソーステーブルからハッシュキーを含むビューを作成していきます。
hashed_ordersのSQLは下記のようになります。(hashed_orders以外のSQLについては折り畳んでありますので詳細を知りたい方は展開していただければと思います。)

{%- set yaml_metadata -%} -- ①
source_model: -- ②
    orders
derived_columns:
    RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
    ORDER_HASH_DIFF:
    is_hashdiff: true
      columns:
        - ORDER_NAME
        - ORDER_AMOUNT
        - CREATED_AT
  ORDER_HK:
    - ORDER_ID
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{ automate_dv.stage(
            include_source_columns=true,
            source_model=metadata_dict["source_model"],
            derived_columns=metadata_dict["derived_columns"],
            hashed_columns=metadata_dict["hashed_columns"],
            null_columns=none,
            ranked_columns=none,
            )
}} -- ③

① : jinjaの記述方法でyaml形式で各種パラメータを設定する
② : ハッシュキーの設定や参照するモデルの設定などをする
③ : automate_dvのstageマクロを使い各種パラメータを引数に設定する

その他のモデルのクエリ

hashed_products

{%- set yaml_metadata -%}
source_model:
  products
derived_columns:
  RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
  HASH_DIFF:
    is_hashdiff: true
      columns:
        - PRODUCT_NAME
        - PRICE
        - CREATED_AT
  PRODUCT_HK:
    - PRODUCT_ID
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{ automate_dv.stage(
      include_source_columns=true,
            source_model=metadata_dict["source_model"],
            derived_columns=metadata_dict["derived_columns"],
            hashed_columns=metadata_dict["hashed_columns"],
            null_columns=none,
            ranked_columns=none,
            )
}}
hashed_customers

{%- set yaml_metadata -%}
source_model:
  customers
derived_columns:
    RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
    HASH_DIFF:
    is_hashdiff: true
      columns:
        - EMAIL
        - PREFECTURE
        - CREATED_AT
  CUSTOMER_HK:
    - CUSTOMER_ID
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{ automate_dv.stage(
            include_source_columns=true,
            source_model=metadata_dict["source_model"],
            derived_columns=metadata_dict["derived_columns"],
            hashed_columns=metadata_dict["hashed_columns"],
            null_columns=none,
            ranked_columns=none,
            )
}}
hashed_customers_orders

{%- set yaml_metadata -%}
source_model:
  customers_orders
derived_columns:
    RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
    HASH_DIFF:
    is_hashdiff: true
      columns:
        - CREATED_AT
  CUSTOMER_HK:
    - CUSTOMER_ID
  ORDER_HK:
    - ORDER_ID
  CUSTOMER_ORDER_HK:
    - CUSTOMER_ID
    - ORDER_ID
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{ automate_dv.stage(
            include_source_columns=true,
            source_model=metadata_dict["source_model"],
            derived_columns=metadata_dict["derived_columns"],
            hashed_columns=metadata_dict["hashed_columns"],
            null_columns=none,
            ranked_columns=none,
            ) 
}}
hashed_orders_products

{%- set yaml_metadata -%}
source_model:
  orders_products
derived_columns:
    RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
    HASH_DIFF:
    is_hashdiff: true
      columns:
        - CREATED_AT
  ORDER_HK:
    - ORDER_ID
  PRODUCT_HK:
    - PRODUCT_ID
  ORDER_PRODUCT_HK:
    - ORDER_ID
    - PRODUCT_ID
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{ automate_dv.stage(
            include_source_columns=true,
            source_model=metadata_dict["source_model"],
            derived_columns=metadata_dict["derived_columns"],
            hashed_columns=metadata_dict["hashed_columns"],
            null_columns=none,
            ranked_columns=none,
            )
}}

続いて、今作成したハッシュキー含むモデルから、Hub/Link/Satelliteを作成していきます。

Hubの生成

ordersのHubテーブルの作成は以下のSQLのようになります。(customersとproductsのHubテーブルは折りたたみ内にSQLを記載しています)

{{ config(materialized="incremental") }}

{%- set yaml_metadata -%}
source_model:
    hashed_orders
src_pk: ORDER_HK
src_nk: ORDER_ID
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.hub(
        src_pk=metadata_dict["src_pk"],
        src_nk=metadata_dict["src_nk"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}

その他のモデルのクエリ

hub_products

{{ config(materialized="incremental") }}

{%- set yaml_metadata -%}
source_model:
    hashed_products
src_pk: PRODUCT_HK
src_nk: PRODUCT_ID
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.hub(
        src_pk=metadata_dict["src_pk"],
        src_nk=metadata_dict["src_nk"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}
hub_customers

{{ config(materialized="incremental") }}

{%- set yaml_metadata -%}
source_model:
    hashed_customers
src_pk: CUSTOMER_HK
src_nk: CUSTOMER_ID
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.hub(
        src_pk=metadata_dict["src_pk"],
        src_nk=metadata_dict["src_nk"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}

Linkの生成

orderとproductの関係性を記述するLinkテーブルの作成は以下のSQLのようになります。(link_customers_ordersは折りたたみ内にSQLを記載しています。)

{{ config(materialized="incremental") }}

{%- set yaml_metadata -%}
source_model:
    hashed_orders_products
src_pk: ORDER_PRODUCT_HK
src_fk:
    - ORDER_HK
    - PRODUCT_HK
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.link(
        src_pk=metadata_dict["src_pk"],
        src_fk=metadata_dict["src_fk"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}

その他のモデルのクエリ

link_customers_orders

{{ config(materialized="incremental") }}

{%- set yaml_metadata -%}
source_model:
    hashed_customers_orders
src_pk: CUSTOMER_ORDER_HK
src_fk:
    - CUSTOMER_HK
    - ORDER_HK
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.link(
        src_pk=metadata_dict["src_pk"],
        src_fk=metadata_dict["src_fk"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}

Satelliteの生成

orderの属性情報を記述するSatelliteテーブルの作成は以下のSQLのようになります。(sat_customersとsat_productsは折りたたみ内にSQLを記載しています。)

{{ config(materialized='incremental') }}

{%- set yaml_metadata -%}
source_model: hashed_orders
src_pk: ORDER_HK
src_hashdiff:
  HASH_DIFF
src_payload:
  - ORDER_NAME
  - ORDER_AMOUNT
  - CREATED_AT
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.sat(
        src_pk=metadata_dict["src_pk"],
        src_hashdiff=metadata_dict["src_hashdiff"],
        src_payload=metadata_dict["src_payload"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}

その他のモデルのクエリ

sat_customers

{{ config(materialized='incremental') }}

{%- set yaml_metadata -%}
source_model: hashed_customers
src_pk: CUSTOMER_HK
src_hashdiff:
  HASH_DIFF
src_payload:
  - EMAIL
  - PREFECTURE
  - CREATED_AT
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.sat(
        src_pk=metadata_dict["src_pk"],
        src_hashdiff=metadata_dict["src_hashdiff"],
        src_payload=metadata_dict["src_payload"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}
sat_products

{{ config(materialized='incremental') }}

{%- set yaml_metadata -%}
source_model: hashed_products
src_pk: PRODUCT_HK
src_hashdiff:
  HASH_DIFFF
src_payload:
  - PRODUCT_NAME
  - PRICE
  - CREATED_AT
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{
    automate_dv.sat(
        src_pk=metadata_dict["src_pk"],
        src_hashdiff=metadata_dict["src_hashdiff"],
        src_payload=metadata_dict["src_payload"],
        src_ldts=metadata_dict["src_ldts"],
        src_source=metadata_dict["src_source"],
        source_model=metadata_dict["source_model"],
    )
}}

automate_dvを使う際の小技

automate_dvのドキュメントでは、stageマクロを実行する際のsource_modelには最新分だけを含んだモデルを対象にするのがベストプラクティスとされています。
ですが、止むを得ず参照するモデルに複数日付が含まれていることがあるかもしれません。この場合、最新分だけが存在する層を作るということもできますが、stageマクロを使いつつ、最新分だけのビューを作成することも可能です。
具体的な手順としては、automate_dv.stage部分をwith句に入れて一時テーブルとして定義し、日付を絞る方法です。以下のように記述することで実現できます

-- パラメータの設定は省略

with stage as (
  {{ automate_dv.stage(
            include_source_columns=true,
            source_model=metadata_dict["source_model"],
            derived_columns=metadata_dict["derived_columns"],
            hashed_columns=metadata_dict["hashed_columns"],
            null_columns=none,
            ranked_columns=none,
            )
  }}
)
select * from stage where loaded_at = 'YYYY-MM-DD'

automate_dvを使用していてハマったポイント

automate_dvは、データウェアハウスの構築において高いパフォーマンスを発揮するツールですが、日本語の情報も少なくハマってしまった時に原因を理解するのに時間がかかると思います。ここでは、実際にDataVault層のSatelliteテーブルを構築する際にハマったポイントについて説明していきます。

履歴化されているデータソースを取り込む場合

Satelliteテーブルに取り込む対象が、前日からの増分のみであれば、automate_dvのSatelliteマクロを使用しても問題ありません。しかし、Satelliteが参照するモデルが過去から現在までの全期間のレコードを持っている場合、予期しない結果になる可能性があります。具体的なサンプルと動作を見ていきましょう。

取り込みたいモデルの構造

カラム名
history_id
customer_id
amount
created_at
loaded_at

対象のモデルのデータの変化

2023-04-02

history_id customer_id amount created_at loaded_at
1 1 1000 2023-04-01 12:00:00 2023-04-02 00:00:00
2 2 1500 2023-04-01 13:00:00 2023-04-02 00:00:00

2023-04-03

history_id customer_id amount created_at loaded_at
1 1 1000 2023-04-01 12:00:00 2023-04-03 00:00:00
2 2 1500 2023-04-01 13:00:00 2023-04-03 00:00:00
3 1 1000 2023-04-02 12:00:00 2023-04-03 00:00:00
4 1 3000 2023-04-02 12:30:00 2023-04-03 00:00:00

2023-04-04

history_id customer_id amount created_at loaded_at
1 1 1000 2023-04-01 12:00:00 2023-04-04 00:00:00
2 2 1500 2023-04-01 13:00:00 2023-04-04 00:00:00
3 1 1000 2023-04-02 12:00:00 2023-04-04 00:00:00
4 1 3000 2023-04-02 12:30:00 2023-04-04 00:00:00
5 3 5000 2023-04-03 15:00:00 2023-04-04 00:00:00
6 1 1000 2023-04-03 16:00:00 2023-04-04 00:00:00

automate_dvを使ってSatelliteテーブルを作った際の結果

SatelliteテーブルのSelect結果

上記のSatelliteテーブルの赤い枠で囲まれた1行目と5行目は同一のレコードになっています。全期間の履歴を保持するようなモデルに対してSatelliteマクロを使うとこのような現象が起きてしまいます。
このような状態になってしまう原因は、automate_dvのマクロを使用することで生成されるSQLを見ることでわかります。
以下生成されたSQLを抜粋

WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASH_DIFF, a.AMOUNT, a.CREATED_AT, a.LOADED_AT, a.RECORD_SOURCE
    FROM hs_order_histories AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT a.CUSTOMER_HK, a.HASH_DIFF, a.LOADED_AT
    FROM (
        SELECT current_records.CUSTOMER_HK, current_records.HASH_DIFF, current_records.LOADED_AT,
            RANK() OVER (
               PARTITION BY current_records.CUSTOMER_HK
               ORDER BY current_records.LOADED_AT DESC
            ) AS rank
        FROM sat_order_histories AS current_records
            JOIN (
                SELECT DISTINCT source_data.CUSTOMER_HK
                FROM source_data
            ) AS source_records
                ON current_records.CUSTOMER_HK = source_records.CUSTOMER_HK
    ) AS a
    WHERE a.rank = 1
),
records_to_insert AS (
    SELECT DISTINCT stage.CUSTOMER_HK, stage.HASH_DIFF, stage.AMOUNT, stage.CREATED_AT, stage.LOADED_AT, stage.RECORD_SOURCE
    FROM source_data AS stage
    LEFT JOIN latest_records
    ON latest_records.CUSTOMER_HK = stage.CUSTOMER_HK
        AND latest_records.HASH_DIFF = stage.HASH_DIFF
    WHERE latest_records.HASH_DIFF IS NULL
)
SELECT * FROM records_to_insert

このSQLでは、ソースモデル(ハッシュキーを含むモデル)とSatelliteテーブルから条件を付与して抽出したテーブルを結合して、新たに追加するレコードを作成しています。
with句の2番目に定義されている最新レコード(latest_records)を見るとRankを使用して最新のレコードを抽出しています。
Satelliteテーブルに追加されるレコードは、ソースデータと最新レコードを結合し、ソースデータにだけ存在するレコードを抽出しています。
(where句でのlatest_records.HASH_DIFF IS NULLでソースデータにだけ存在するレコードを取ろうとしています。)

今回例として取り上げたような、同じハッシュキーに対して、異なるHASH_DIFFが追加されるようなテーブルの場合、Satelliteマクロは予期した動作をしないことがあります。
対応策としては、最初に述べたように、Satelliteテーブルが参照するモデルには最新分しか含まないようにすることです。中間層に手を加えたくない場合は、SQLでSatelliteテーブルに存在しないレコードのみを抽出するようなSQLを書きます。

select distinct
    customer_hk,
    hash_diff,
    amount,
    created_at,
    loaded_at,
    record_source
from hs_order_history as stg
where
    customer_hk is not null
    {% if is_incremental() %}
    and not exists (
        select 1
        from
            (
                select customer_hk, hash_diff
                from {{ this }} as cur
                where
                    stg.customer_hk = cur.customer_hk
                    and stg.hash_diff = cur.hash_diff
            )
    )
    {% endif %}

RawVault層までの実装にautomate_dvを使った感想など

  • 1つのビジネスキーに対して、ログを溜め続けるようなデータソースがある場合、Satelliteマクロを使用する際に生成されるSQLを見ると、余計なレコードが生成されてしまうことがあります。そのため、データソースによってはマクロの使用の有無や参照モデルの修正など、使い分けや手を加える必要があり、少し面倒に感じました。
  • マクロを使用することで、キーのハッシュ化など、SQLで記述する場合には多くのコード量が必要な部分をマクロで置き換えられるのはすごく便利に感じました。
  • 予期しない動作が発生した場合、コンパイルされたSQLを見ることで原因を理解することはできますが、automate_dvの実装を見るとマクロ内でマクロを呼び出しているためautomate_dvでの仕組みを理解するのには時間がかかる印象でした。
  • データソース側で物理削除されている場合、Satelliteではレコードが削除されたかどうかが判断できないので、StatusTrackingSatelliteが欲しくなる場面がありましたが、現在(2023年6月時点)はautomate_dvにマクロはまだ実装されていないのでSQLを直接書く必要がありました。(EffectiveSatelliteのマクロはあるのでもしかしたらそちらで対応できるかもしれません)

まとめと次回予告

今回の記事ではDataVaultモデリングの中心となるHub/Link/Satelliteを生成するためのdbtのパッケージの1つであるautomate_dvについて紹介させていただきました。

automate_dvはSQLだけで、Hub/Link/Satelliteの生成クエリを実装するのに比べて、非常に少ない記述量で各種テーブルの実装を行うことができます。
当たり前ですが、マクロによって生成されるSQLがどんなことをしているのかを理解するのが、使いこなす上で重要になってくると思います。
次回は、BusinessVaultや今回の記事で名前だけ出てきているEffectiveSatelliteやStatusTrackingSatelliteの紹介や、ビジネスキーがNullだった場合の対応方法などについて紹介させていただければと思います。

参考資料

automate_dvドキュメントページ


  1. 記事を書き始めた時はdbtvaultでしたが、気づいたらautomate_dvに変わってました...

© Safie Inc.