Snowflake FRESHNESSを使った定期的な鮮度テストを実装する

記事タイトルとURLをコピーする

この記事は約3分で読めます。

はじめに

アプリケーションサービス部の鎌田(義)です。
今回は、SnowflakeのFRESHNESSを触ってみます。

概要

データ活用基盤では日々データソースからDWHへデータを収集、蓄積していきますが、
様々なイレギュラーが起こり得るかと思います。

最新のデータが反映されているか、というのもデータ品質を担保する上で重要な要素となります。
例えば、SnowpipeでS3などのデータソースから定期的にSnowflakeにデータを収集する運用をしている場合、
データソース側の障害や通信経路上での何らかの障害で、 イベント通知がSnowpipeに到達しないケースも考えられるかと思います。

上記のようなケースでは、
収集先のテーブル側で鮮度テストを定期的に行い最新データが収集されていることを確認する、というのも有効な手段かと思います。

本記事では、FRESHNESSを使用して定期的に監視を行う仕組みを実装してみます。

実装

以下のような流れで実装します。

  1. FRESHNESSの定期実行を対象テーブルに定義
  2. CREATE ALERT で監視設定

※FRESHNESSは、EnterpriseEditionで使用できるDMF (data metric functions) の一つです。

docs.snowflake.com

1. FRESHNESSの定期実行を対象テーブルに定義

docs.snowflake.com

まずは鮮度テストの監視対象となるテーブルを作成します。
鮮度テストの対象となるカラムは以下のいずれかのデータ型である必要があります。

  • DATE
  • TIMESTAMP_LTZ
  • TIMESTAMP_TZ
CREATE OR REPLACE TABLE <テーブル名> (
col1 NUMBER,
col2 VARCHAR,
loaded_at TIMESTAMP_LTZ
);

適当なデータを挿入します

INSERT INTO <テーブル名> (col1, col2, loaded_at) VALUES (1, 'test1', current_timestamp());

作成したテーブルにFRESHNESSの定期実行を定義します

-- 5分おきにDMFが実行されるようセットします
ALTER TABLE <テーブル名> SET
DATA_METRIC_SCHEDULE = '5 minute';
-- DMF(FRESHNESS)をテーブルに定義します
ALTER TABLE <テーブル名>
ADD DATA METRIC FUNCTION SNOWFLAKE.CORE.FRESHNESS
ON (loaded_at);
-- テーブルにDMFが設定されたことを確認します
SELECT * FROM TABLE(INFORMATION_SCHEMA.DATA_METRIC_FUNCTION_REFERENCES(
REF_ENTITY_NAME => '<テーブル名>',
REF_ENTITY_DOMAIN => 'TABLE'));

2. CREATE ALERT で監視設定

鮮度テストでNGがあった時の通知先として使用する通知統合を作成します。
今回はEmailへ通知を飛ばすよう設定しますが、通知先にSNSを指定することも可能です。

CREATE NOTIFICATION INTEGRATION <通知設定名>
TYPE=EMAIL
ENABLED=TRUE
ALLOWED_RECIPIENTS=('<メールアドレス>');

前回正常に評価されたアラートから現在スケジュールされたアラートまでの間に実行された鮮度テストの結果を対象に
タイムスタンプ列の値が300秒(5分)を経過しているかどうか1分おきに監視し、
結果がTrue(鮮度テストNG)の場合はメールを送信するよう設定しています。

CREATE OR REPLACE ALERT <アラート名>
WAREHOUSE = <WAREHOUSE>
SCHEDULE = '1 minute'
IF( EXISTS(
SELECT value
FROM SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS
WHERE measurement_time BETWEEN SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME()
AND SNOWFLAKE.ALERT.SCHEDULED_TIME()
AND table_name = <テーブル名>
AND metric_name = 'FRESHNESS'
AND value > 300
))
THEN
CALL SYSTEM$SEND_EMAIL(
'<通知設定名>',
'<送信先メールアドレス>',
'<メールタイトル>',
'<メール本文>'
);

作成されたアラートは中断状態の為、次のコマンドで開始します。

ALTER ALERT <アラート名> RESUME;
-- 手動でアラートを実行する場合は以下コマンド
EXECUTE ALERT <アラート名>;

最初に投入したデータが5分以上古い場合、メール通知が届いているはずです。

お片付け

動作確認ができたら作成した各リソースを削除しておきます。

-- 作成したアラートを削除
DROP ALERT <アラート名>;
-- テーブルのDMF設定を解除
ALTER TABLE <テーブル名>
DROP DATA METRIC FUNCTION SNOWFLAKE.CORE.FRESHNESS
ON (loaded_at)
;

DMFの実行で消費されたクレジットは以下で確認できます。

SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.DATA_QUALITY_MONITORING_USAGE_HISTORY
WHERE START_TIME >= CURRENT_TIMESTAMP - INTERVAL '1 days'
;

おわりに

今回は、DMFを活用したデータ品質の監視を設定してみました。
どなたかの参考になれば幸いです。

参考

docs.snowflake.com docs.snowflake.com

鎌田 義章 (執筆記事一覧)

2023年4月入社 アプリケーションサービス本部ディベロップメントサービス3課