データ基盤を支えるSQL Serverのデータ転送を安定化させた話

f:id:vasilyjp:20190322125652j:plain

こんにちは!
ZOZOテクノロジーズ開発部の中坊(e_tyubo)です。

私の所属しているマーケティングオートメーション(MA)チームでは、ZOZOTOWNやWEAR等の各サービスで蓄積されたデータを集約したデータ基盤の運用を行なっております。我々MAチームはこの集約されたデータを用いて顧客分析を行いマーケティングに活用しています。
今回はその運用の中で生じた問題とその解決方法ついて紹介致します。

概要

データ基盤には定期的にデータを流し込む処理がスケジューリングされており、膨大なデータが毎日転送されています。 現在このデータ転送処理はサービス側のデータベースから直接データを流しておらず、一度ハブとなるデータベースに集約しています。

ZOZOTOWNとWEARはどちらもSQL Serverを使っているため、ハブとなるデータベースもSQL Serverを使っています。 SQL Serverでデータを転送する方法はいくつかありますが、定期的にバッチ処理でデータをまとめて転送する場合に注意すべきポイントがあったため対応を行いました。

この記事ではMAチームが運用しているSQL Server同士のデータ連携で生じた問題と実施した改善内容をご紹介します。

前提

SQL Server同士の連携

まずはデータ連携のシステムを紹介します
ハブとなるデータベースをマーケティング用途に使うことからMarkeデータベースと呼びます。 ZOZOTOWNやWEARのデータベースからMarkeデータベースに連携する流れは下図のようになります。
f:id:vasilyjp:20190319194428p:plain

このようにbcpコマンドを用いてデータのエクスポートとインポートを実現しています。 どのようなクエリでbcp outするかは次に説明する管理テーブルで定義します。

管理テーブル

データ連携はMarkeデータベース上にある管理テーブルの情報をもとに実行されます。この管理テーブルには下記の情報が含まれます。

  • 接続先DBの情報
  • 連携タイミング
  • 連携時に実行するSQL
  • 取得結果を保存するMarkeデータベース上のテーブル名

例えば下記のようなデータが入っていたとすると、指定したタイミングにデータベースに対してSQLが実行され、その結果をMarkeデータベースのテーブルに保存します。

接続先 タイミング クエリ 保存先テーブル名
zozotown_db_1 夜間1回 SELECT * FROM zozotown_db.dbo.TableA TableA
wear_db_1 日中に定期実行 SELECT column1, column2 FROM wear_db.dbo.TableB TableB

現在この管理テーブルに約300テーブル分のデータが入っており、日々データが更新されています。

課題

さて、ここからが本題です。運用上問題になったことは2つあります。

テーブル定義が変わる

サービスを運用しているとテーブル定義の変更は起こり得ます。例えばサービス側のテーブルにカラムを追加すると、エクスポートするデータの形式が変化してしまいます。具体的には下図のように、SELECT * FROM ~を使っているケースで問題が起きます。
f:id:vasilyjp:20190319194510p:plain

カラム追加時はMarkeデータベースにも追加しておかないと、インポートするデータとテーブル定義が異なることでエラーになります。これは実際に何度も発生していました。

そこで、すべての管理テーブルのレコードのクエリをカラム指定することにしました。そうすることで仮にカラムが追加されてもクエリの結果は変化しないため、問題が生じなくなります。もしカラムのデータが必要な場合は追加対応を行えば良いため、要件的にもすぐには問題になりません。

カラムが削除されたりカラムの定義が変わった場合は別途対応が必要になりますが、発生頻度が低いため考慮していません。

ソートされていないデータの大量インサート

データ連携ではデータを全て入れ替えていますが、SELECT時にクラスター化インデックスを定義するカラムと同じ並び順でソートしないとINSERTに時間を要することがあります。
以降「クラスター化インデックスを定義するカラムと同じ並び順でソート」は長いので、「クラスター化インデックスでソート」と呼びます。

INSERTが遅くなる原因について説明します。
SQL Serverはページという容量8KBのまとまりでデータを保存していて、レコードの情報はページの中に含まれています。INSERTする時はクラスター化インデックスで規定された順番通りにデータを挿入しますが、その順序はページの中で保つ必要があります。ページの容量に限界が来るとページの新規作成かページの分割が必要になり、その挙動の違いでパフォーマンスが変わります。

まずクラスター化インデックスでソートされたデータを挿入するケースを考えます。

分かりやすくするために、主キーである整数型のidがクラスター化インデックスになっているケースを想定します。 ページの中に4レコードしか入らないと仮定すると、5レコード目を挿入する時は下図のように新しいページを作成します。
f:id:vasilyjp:20190319194513p:plain

挿入されるデータの順序がクラスター化インデックスの順序と一致しているため、INSERT時にはページの新規作成だけで完結します。

次にクラスター化インデックスでソートされていないケースを考えます。

5レコード目はidの値で並び替えた場合既存のページに含まれる必要がありますが、すでに容量がありません。この時は下図のようにページの分割が発生します。
f:id:vasilyjp:20190319194516p:plain

分割されて新たに生成されたページ1'には空き容量があるため、適切な領域にデータが追加されます。 データの量が多い時はこのページ分割の頻度次第で、パフォーマンスに影響する可能性があります。

適切にソートされていない場合、INSERT済みのデータ量が多いほどページ分割が発生しやすくなります。大量データをbcpでインポートする際にINSERT速度が低下する場合は、今回ご説明したページ分割が大量に発生している可能性を疑います。

ページ分割の発生を最小限にするためには全てのSELECT文にクラスター化インデックスを用いた適切なORDER BY句を設定する必要があります。 現状主キーがクラスター化インデックスに指定されているケースで対応が可能なため、主キーを基準にソートしました。

2つの課題をまとめると、下記の対策が必要です。

  • SELECT句を全てカラム指定にする
  • ORDER BY句を主キーのカラムと同じ並び順で指定する

実際に対応の必要なレコードがかなりの数存在していたため、手動で書き換えると大変手間になることが予想されました。そこで管理テーブルに設定されてるクエリを更新するためのUPDATE文を自動生成することにしました。以下でその手法を紹介します。

改善方法

方針

すでに管理テーブルに入っているレコードを正しい形に修正する必要があるため、UPDATE文を対象となるレコード全てに対して作成します。
実現のためにPythonを使ってODBCドライバー経由でSQL Serverに接続できるpyodbcというライブラリを使いました。これはMicrosoftの公式ドキュメントで言及されているSQL Serverにアクセスする方法です。

Pythonを使った理由はDigdagで利用することを念頭に置いているからです。 MAチームではワークフローを管理するツールとしてDigdagを利用しており、今後ワークフローに乗せることを想定しています。

実装の流れは下記のようになります。

  • 現在連携されているテーブルの連携情報の一覧を取得
  • 対象テーブルの定義を取得
  • 対象テーブルの主キー情報を取得しORDER BY句を生成
  • 既存のクエリにWHERE句が指定されていたら継承する
  • 用意した情報からUDPATEクエリを生成しsqlファイルに出力

テーブル一覧を取得する

まずは管理テーブルから同期対象のテーブル一覧を取ってきます。
この時に対象テーブル名とクエリを全て取得します。

対象テーブルの定義を取得

information_schemaを使ってカラム情報を取得します。
bcpコマンドを使ってインポートする際は、エクスポート時に指定するSELECT句のカラム順序を、インポート先のテーブルのカラム順序と合わせておく必要があります。そのため、ordinal_positionを基準にソートしています。

SELECT 
    table_name, column_name
FROM
    information_schema.columns
ORDER BY
    ordinal_position

取得した情報からテーブル名毎のカラム定義のdictionaryをPythonのロジックの中で用意しておきます。

対象テーブルの主キー情報を取得しORDER BY句を生成

下記のシステムカタログビューを経由してテーブル毎に主キーで使われるカラム名を取得します。
それぞれの名前と用途と取得用クエリは下記の通りです。

名前 用途
sys.tables テーブル名の取得
sys.key_constraints 主キー制約の取得
sys.index_columns 制約の中で使われるindexの取得
sys.columns indexで使われるカラム名の取得
SELECT
    tbls.name, cols.name
FROM
    sys.tables AS tbls
    INNER JOIN sys.key_constraints AS key_const ON tbls.object_id = key_const.parent_object_id AND key_const.type = 'PK'
    INNER JOIN sys.index_columns AS idx_cols ON key_const.parent_object_id = idx_cols.object_id AND key_const.unique_index_id = idx_cols.index_id
    INNER JOIN sys.columns AS cols ON idx_cols.object_id = cols.object_id AND idx_cols.column_id = cols.column_id
ORDER BY
    tbls.name, idx_cols.key_ordinal

複合主キーの場合index内で使われるカラムの順序を合わせる必要があるため、テーブル毎にkey_ordinalを基準にソートしています。このクエリを使うと主キーに使われるカラム名を全て取得できるので、テーブル名毎にORDER BY句を作ります。

既存のクエリにWHERE句が指定されていたら継承する

既存のクエリのロジックを変えないために、既に指定されているWHERE句を抽出して新しいSELECT文に適用する処理が必要になります。
例えば下記のようなクエリがあった場合は、WHERE column2 >= 1を切り取る必要があります。

SELECT
    column1, column2
FROM
    db.schema.table
WHERE
    column2 >= 1

Pythonを使うと下記の様な処理でクエリの中で存在するWHERE句以下を抽出できるので、保持しておきます。

where_start_pos = query.upper().find('WHERE', 0)

if where_start_pos >= 0:
    where = query[where_start_pos:len(query)]

また、WHERE句以降にORDER BYが使われているケースが存在する時は、同様のロジックでORDER BY以降を切り取って適切なものに置き換えを実施しました。

用意した情報からクエリを生成しsqlファイルに出力

これまでの処理でクエリを生成するために必要な情報は揃いました。これらを使ってUPDATE文を生成します。

用意したデータを下記の変数で保持しているとします。

  • target_tables: 連携しているテーブルのリスト
  • column_map: テーブル毎のカラム定義のリスト
  • primary_key_map: テーブル毎の主キーに使われるカラムのリスト
  • where_map: テーブル毎の既存クエリのWHERE句

また、テーブル名毎の更新対象レコードidをrecord_id_mapという変数で持っているものとします。

Pythonの中でのクエリ生成処理の流れは簡略化すると下記の様になります。

update_query_format = "UPDATE manage_table SET select_query = %s WHERE id = %d"
for table in target_tables:
  # テーブル毎に必要な情報を変数に入れる
  column_list = column_map[table]
  primary_key_list = primary_key_map[table]
  where = where_map[table]
  record_id = record_id_map[table]
  # 用意した情報を整形してSELECT文を生成(メソッドの詳細は省略)
  select_query = create_select_query(table, column_list, primary_key_list, where)
  # 用意したフォーマットを置換してUPDATE文を作成
  update_query = update_query_format % (select_query, record_id)
  # 生成されたSQLを必要な場所に出力

最終的に出力されたUPDATE分を実行したら以降はカラム指定でテーブルが連携されるため、対策は完了です。

改善後の成果

まだ実施から3か月ほどしか経っていませんが、以降カラム追加に伴うエラーは発生しておりません。さらに実施後は新しく連携テーブルを追加する際はこの仕組みを使ってSELECT文を生成できるので、人為的なミスがおきにくくなっているだけでなく作業も効率化されています。
ORDER BY句を指定する等のクエリ生成時のルールをロジックに内包できるので、今後仕様の追加があっても容易に仕組に取り込むことが可能になったこともメリットだと思います。

今後の展望

本記事で紹介した機能の延長で、いずれはテーブル定義の差分を自動検知する仕組みを作りたいと思っています。Digdag上でワークフローとして定義し、定期的に必要なALTER文を生成して通知する仕組みを作ることでテーブル定義を常に正しい状態に維持しやすくなると考えています。

まとめ

本記事ではデータ統合のためのデータ連携の仕組みの一部と、それを安定化させるための取り組みを紹介しました。データ連携を改善する1つの方法として参考になれば幸いです。

MAチームではMA基盤上のアプリケーションの開発・運用だけでなく、データ連携の仕組みの開発・運用も行なっており、ビッグデータを活用したデータ基盤の改善に取り組んでいます。目立たない分野ですが非常にサービスへの影響は大きく、挑戦のしがいがあるチームです。
ZOZOテクノロジーズでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。

https://www.wantedly.com/companies/zozo-tech/projects

カテゴリー