こんにちは、MA基盤チームの田島です。私達のチームではMAIL、LINE、PUSH通知といったユーザへの配信をしています。その中でもマス・セグメント配信という一斉に行う配信では、配信対象者のセグメント抽出にBigQueryを利用しています。また、配信前に必要なデータをBigQueryに連携しデータマートの集計をしたり、配信後には配信実績の登録などの更新処理をしています。 そのような処理を定期的に行っているため、ネットワークの問題やサーバーの不調などにより処理が途中で失敗することがあります。そこで、リトライを容易にするため、すべての処理を冪等にしました。今回その中でも、BigQueryの追記処理に絞ってどのように冪等化したのかについて紹介します。 目次 目次 マス・セグメント配信基盤の紹介 課題 冪等化 BigQuery追記処理に関する冪等化の取り組み 冪等にならないケース INSERT 初期データ 最初のINSERT処理 INSERT後のデータ INSERT処理をリトライ リトライ後のデータ DestinationTableのWRITE_APPEND 初期データ 参照元のデータ 最初のWRITE_APPEND処理 WRITE_APPEND後のデータ WRITE_APPEND処理をリトライ リトライ後のデータ 3種類の冪等化 Overwrite 例)データ連携 例)データマート更新 使い所 Copy Table & Append 例)データ連携 使い所 Create Table & Merge 例)SELECT結果をAPPEND ひと工夫 もうひと工夫 使い所 比較 冪等化の結果 まとめ マス・セグメント配信基盤の紹介 まず、各配信の流れを簡易化したものが以下になります。 上記の流れはDigdagというワークフローエンジンを利用することで実現しています。 Digdagの基盤については、以下のテックブログをご参照ください。 techblog.zozo.com 課題 バッチ処理においては、ネットワークの問題やサーバーの不調により予期せぬタイミングで処理が異常終了することもあります。発生時にはDigdagのリトライ機能を利用し、処理単位ごとに自動リトライしています。しかし、なにも考えずにリトライするとデータが重複して登録されたり、同じ配信を複数回行ってしまうといった問題が起こり得ます。 BigQueryの更新処理については、最近導入された Transactionの仕組み があります。この機能を使うことで一連のクエリをTransactionの中で完結できます。しかし、クエリの更新処理がすべて完了しCommitの完了後、アプリケーションやDigdagで異常が発生することもあります。そのときリトライすると同じトランザクション処理を再度実行してしまい、同じ処理が重複して実行されてしまいます。 このような事象はかなりレアケースではありますが、安定したバッチ処理をするにはこの問題に関しても対応する必要があります。 冪等化 上記の課題を解決するためにデータの更新処理を含め、すべての処理を冪等化しました。それにより、処理が失敗した場合はただリトライをするだけで整合性が担保されるようにしました。そして、リトライは自動的に行われているので、手動でのオペレーションも不要です。 BigQuery追記処理に関する冪等化の取り組み ここからはBigQueryのデータ処理に限定して、冪等化の取り組みについて紹介していきます。今回扱う更新処理は追記処理に限定しています。 冪等にならないケース BigQueryの追記処理において冪等にならないようなケースについてのパターンを紹介します。今回紹介するケースの他に、自己参照したデータを利用するUPDATE文も冪等にならないことがあります。しかし、私達のシステムではBigQueryにおいてそのような処理を行っていないため、今回は省略し追記処理に限定しました。 INSERT 最初はINSERT文です。BigQueryはユニークキー制約が無いため、INSERT処理は常に追記されることになります。よって、INSERT成功後に処理が失敗しリトライされてしまうとデータの重複が発生します。 PUSH通知の実績テーブルである push_delivered テーブルへのINSERT処理を例に、リトライによりデータが重複するケースを紹介します。 初期データ まず初期データとして以下の2レコードが存在するとします。 user_id message delivered_at 1 message1 2022-07-26 12:00:00 2 message1 2022-07-26 12:00:00 最初のINSERT処理 push_delivered テーブルに対し以下のINSERT処理を行います。 INSERT INTO `project.dataset.push_delivered` (user_id, message, deliverd_at) VALUES ( 3 , " message2 " , " 2022-07-26 12:00:00 " ); INSERT後のデータ INSERT処理が成功すると以下のようにレコードが1行追加されます。これが正しいデータになります。 user_id message delivered_at 1 message1 2022-07-26 12:00:00 2 message1 2022-07-26 12:00:00 3 message2 2022-07-26 13:00:00 INSERT処理をリトライ リトライされた場合、もう一度以下のINSERT処理が行われます。 INSERT INTO `project.dataset.push_delivered` (user_id, message, deliverd_at) VALUES ( 3 , " message2 " , " 2022-07-26 12:00:00 " ); リトライ後のデータ リトライされると、以下のように user_id=3 のデータが重複してしまいます。 user_id message delivered_at 1 message1 2022-07-26 12:00:00 2 message1 2022-07-26 12:00:00 3 message2 2022-07-26 13:00:00 3 message2 2022-07-26 13:00:00 DestinationTableのWRITE_APPEND 続いて紹介するのが、BigQueryのDestinationTableという機能です。 cloud.google.com これは、SELECTした結果をテーブルに書き込むことができる機能で、書き込み方法としては以下の3種類が存在します。 WRITE_EMPTY WRITE_TRUNCATE WRITE_APPEND WRITE_EMPTY と WRITE_TRUNCATE はSELECT文の結果をそのまま指定したテーブルに書き込みます。 WRITE_EMPTY の場合はテーブルのデータが存在しない場合のみ書き込みを行います。そのため、これらの処理は何度実行しても最終的なデータは同じになります。 WRITE_APPEND の場合はSELECT文の結果を指定したテーブルに追記します。そのため、同じ処理を何度も行うとそのたびに同じデータが追記され、重複データが生じてしまいます。以下にデータが重複するケースを紹介します。 初期データ まず先程の例と同じように、初期データとして以下の2レコードが存在するとします。 user_id message delivered_at 1 message1 2022-07-26 12:00:00 2 message1 2022-07-26 12:00:00 参照元のデータ 今回、WRITE_APPENDするデータとして push_delivered_20220727 というテーブルを用意します。 user_id message delivered_at 3 message2 2022-07-27 12:00:00 最初のWRITE_APPEND処理 push_delivered テーブルに対し以下のSELECT処理の結果を追記します。 SELECT * FROM `project.dataset.push_delivered20220727` WRITE_APPEND後のデータ SELECT APPEND処理が成功すると以下のようにレコードが1行追加されます。これが正しいデータになります。 user_id message delivered_at 1 message1 2022-07-26 12:00:00 2 message1 2022-07-26 12:00:00 3 message2 2022-07-26 13:00:00 WRITE_APPEND処理をリトライ リトライされた場合、もう一度以下のSELECT APPEND処理が行われます。 SELECT * FROM `project.dataset.push_delivered20220727` リトライ後のデータ リトライされると、以下のように user_id=3 のデータが重複してしまいます。 user_id message delivered_at 1 message1 2022-07-26 12:00:00 2 message1 2022-07-26 12:00:00 3 message2 2022-07-26 13:00:00 3 message2 2022-07-26 13:00:00 3種類の冪等化 上記で紹介したような処理を冪等化するために、3種類のパターンを利用しています。 Overwrite Copy Table & Append Create Temp Table & Merge それぞれ、どのようなときにどのパターンを利用しているのかについて紹介します。 Overwrite 一番簡単なやり方は、全量のデータをまるごと洗い替えする方法です。以下がデータ更新の流れです。 2つの例を元に、具体的な処理を紹介します。 例)データ連携 最初の例として、PostgreSQLに存在するテーブル push_delivered をBigQueryのテーブルに連携する場合を考えます。ここでは、データ連携にEmbulkというツールを利用したケースを紹介します。 以下がデータ連携を実現するためのEmbulkの設定ファイルです。 in : type : postgresql host : {{ env.postgres_host }} user : {{ env.postgres_user }} password : {{ env.postgres_password }} database : db query : SELECT * FROM push_delivered out : type : bigquery project : {{ env.bq_project }} dataset : {{ env.bq_dataset }} table : push_delivered mode : replace with_rehearsal : false source_format : NEWLINE_DELIMITED_JSON 上記の設定でEmbulkを実行することで、PostgreSQLに存在するテーブル push_delivered の全量データをBigQueryのテーブルに連携します。このとき mode: replace を利用しているため、BigQueryのデータは毎回PostgreSQLのテーブルにあるデータで全量上書きされます。よって、この処理を何回繰り返したとしても、結果はPostgreSQLのテーブルのデータと同じになります。 例)データマート更新 続いてデータマートの更新について考えます。データマートの更新には、DestinationTable機能を利用します。ここではPythonクライアントを使った方法を紹介します。 以下は offer_delivered テーブルにある channel = 'PUSH' のデータを push_delivered マートとしてテーブルを作成している例です。 query=`SELECT offer_delivered WHERE channel = 'PUSH' ` client = bigquery.Client() destination_table = 'project.dataset.push_delivered' job_config = bigquery.QueryJobConfig( destination=destination_table, write_disposition= 'WRITE_TRUNCATE' ) query_job = client.query(query, job_config=job_config) DestinationTable機能のうち WRITE_TRUNCATE を利用しています。 query に格納したSELECT文を実行し、その結果をそのまま push_delivered というテーブルに上書きします。この処理に関しても、参照元のデータが更新されない限り何度実行しても更新対象テーブルである push_delivered のデータは最終的に同じになります。 使い所 この方法はただ単純にデータを全量上書きすれば良いので非常に実装がシンプルになります。しかし、全量のデータを扱うため、データ量が大きいものには向いていません。データ量が大きい場合、データの処理時間や費用増加といったデメリットがあります。 よって、使い所としてはそれほどデータ量が多くないテーブルの連携やデータマートの更新処理に向いています。これから紹介する手法の中でも一番シンプルかつ安定した方法になるため、データ量が許容できるのであればこの方法を利用することをおすすめします。 Copy Table & Append 次は既存のテーブルをコピーしてから INSERT や WRITE_APPEND する方法です。これは、データを差分更新する方法で、洗い替えをするにはデータ量が多すぎる場合に利用できます。以下がデータ更新の流れです。 更新対象のテーブルをバックアップする バックアップしておいたテーブルをtempテーブルにコピーする 追記したいデータをコピーしたテーブルに追記する 最後にtempテーブルを、実際に更新したいテーブルへコピーし、tempテーブルを削除する こうすることで、何度実行しても最終的にはバックアップテーブルに対して更新データ追記したものが更新対象テーブルのデータになります。ここでもデータ連携を具体例として紹介します。 例)データ連携 今回の例は1日1回データ連携することを想定します。以下の例では追記処理としてDestinationTableのWRITE_APPEND機能を利用します。また、ここではテーブルをバックアップしておく代わりに、BigQueryのタイムトラベル機能を利用します。タイムトラベル機能は過去の日時を指定することで、そのタイミングでのデータを参照できる機能です。詳しくは以下のドキュメントを参照してください。 cloud.google.com これを利用することで、テーブルのバックアップを管理する必要がなくなります。ただし、7日間までという成約があるため、1か月ごとのデータ更新等には利用できません。 まずはじめに、差分更新したいデータをPostgreSQLから連携します。これには、Embulkを利用します。以下では、PostgreSQLの push_delivered のうち当日登録されたデータをBigQueryの push_delivered_20220727 というテーブルに連携します。 in : type : postgresql host : {{ env.postgres_host }} user : {{ env.postgres_user }} password : {{ env.postgres_password }} database : db query : SELECT * FROM push_delivered WHERE CAST(delivered_at AS DATE) = current_date out : type : bigquery project : {{ env.bq_project }} dataset : {{ env.bq_dataset }} table : push_delivered_20220727 mode : replace with_rehearsal : false source_format : NEWLINE_DELIMITED_JSON データ連携後、以下のようなスクリプトで差分更新をします。 def copy_table (self, source_table_id, destination_table_id, write_disposition): copy_config = bigquery.CopyJobConfig(write_disposition=write_disposition) job = self.client.copy_table(source_table_id, destination_table_id, job_config=copy_config) job.result() def append (self, source_table_id, destination_table_id, base_time): base_datetime = dt.strptime(base_time, '%Y-%m-%d %H:%M:%S%z' ) snapshot_epoch = int (base_datetime.timestamp()) * 1000 snapshot_table_id = "{}@{}" .format(destination_table_id, snapshot_epoch) temp_table_id = "{}_temp" .format(destination_table_id) self.copy_table(snapshot_table_id, temp_table_id, 'WRITE_TRUNCATE' ) self.copy_table(source_table_id, temp_table_id, 'WRITE_APPEND' ) self.copy_table(temp_table_id, destination_table_id, 'WRITE_TRUNCATE' ) self.client.delete_table(temp_table_id) if __name__ == '__main__' : append(push_delivered_20220727, push_delivered, '2022-07-26 12:00:00+09:00' ) スクリプトの流れは以下のようになっています。 base_time='2022-07-26 12:00:00+09:00' を指定し、更新対象テーブルである push_delivered のいつ時点をバックアップとして扱うのかを設定 バックアップテーブルを 更新対象テーブル_temp という名前でコピー tempテーブルに対して差分データが格納された push_delivered_20220727 のデータを追記 tempテーブルを更新対象テーブルに上書き こうすることで、この処理を何回実行しても push_delivered は半日前のデータに push_delivered_20220727 を追記したデータとなります。任意のタイミングのデータを参照するには テーブル名@UNIXエポック時刻 という名前でテーブルを参照することで取得できます。 また、このとき BigQueryのCopyJob機能 を使っています。DestinationTableのときと同じように WRITE_TRUNCATE WRITE_APPEND をオプションとして指定できます。そのためテーブルのコピー時に、コピー先のテーブルにデータを上書きするか追記するかが選択可能です。CopyJobの利用には料金がかからないため、コスト削減にも繋がります。 使い所 この方法を利用することでデータの差分更新が可能となり、データ量が大きいテーブルに対する処理の更新量を減らすことができます。そのため、処理時間や費用を抑えることができます。ただし、一度データをバックアップからコピーし更新をするため、同じテーブルの更新を同時にできません。また、タイムトラベル機能を利用できない場合はバックアップデータを保持しておく必要があります。 Create Table & Merge 続いての手法は、Copy Table & Appendで扱えなかった、同じテーブルへの並列更新をしたい場合に利用できます。イメージとしてはUPSERT処理に近いです。以下がその流れです。 更新対象のテーブルのスキーマ情報のみをコピー コピーしたテーブルに対してデータを追記 ユニークキーを利用し、テーブル同士をマージ このようにすると、最初の実行ではデータが追記されます。その後リトライされた場合は、MERGE処理のタイミングですでにデータが書き込まれていないデータのみ追記します。よって、この処理を何回実行しても更新対象テーブルは最終的には同じデータになります。 具体例として、SELECT結果を更新対象テーブルに追記したいケースを紹介します。 例)SELECT結果をAPPEND 以下がSELECT結果を更新対象テーブルに追記するためのSQLになります。 CREATE TEMP TABLE insert_data AS SELECT * FROM `project.dataset.offer_delivered` LIMIT 0 ; INSERT INTO insert_data (user_id, message, delivered_at, channel) SELECT user_id, message, delivered_at, ' PUSH ' FROM `project.dataset.push_delivery` MERGE `project.dataset.offer_delivered` target USING insert_data ON (target.user_id = target.user_id and target.message = insert_data.message and target.delivered_at = insert_data.delivered_at) WHEN NOT MATCHED THEN INSERT ROW ; SELECT * FROM APPEND先のテーブル名 LIMIT 0 としてテーブルをコピーし insert_data という一時テーブルを作成 作成したTEMPテーブルに格納したいデータを INSERT MERGE文を使い、更新対象テーブルと一時テーブルをマージ 1の処理にはCopyTable機能も利用できますが、その場合最後に自分でテーブルを削除する必要があります。 LIMIT 0 でのSELECTはスキャンが走らないため料金はかかりません。また、データのマージには条件としてユニークキーを渡す必要があります。 ひと工夫 このとき、テーブルサイズが大きいとMERGE処理時にフルスキャンが走るため、処理時間および料金の増加が発生します。そこで更新対象テーブルをパーティション分割しておき、MERGE文の条件にパーティションの条件を入れることでその問題を解決できます。 CREATE TEMP TABLE insert_data AS SELECT * FROM project.dataset.offer_delivered LIMIT 0 ; INSERT INTO insert_data (user_id, message, delivered_at, channel) SELECT user_id, message, delivered_at, ' PUSH ' FROM `project.dataset.push_delivery` MERGE `project.dataset.offer_delivered` target USING insert_data ON (target.user_id = target.user_id and target.message = insert_data.message and target.delivered_at = insert_data.delivered_at and DATE (target.delivered_at) >= CURRENT_DATE ( ' Asia/Tokyo ' )) WHEN NOT MATCHED THEN INSERT ROW ; もうひと工夫 上記で見ていただいたように、この手法だとテーブル毎に同じような複雑なクエリを書く必要があります。また、ユニークキーやパーティションキーに関しても別途管理が必要になります。そこで以下のようなスクリプトでクエリを生成できるようにしました。ここではテンプレートエンジンに Jinja を利用しています。これにより、必要なデータは更新したいテーブル名と、追記したいデータを抽出するSELECT文のみになります。 def select_and_append (select_sql, target_table_id, partition_base_time = date.today()): TEMPLATE_PATH = 'write_append.tmpl' client = bigquery.Client() target_table = client.get_table(target_table_id) target_table_column_names = [column.name for column in target_table.schema] time_partition_column = target_table.time_partitioning partition_column_name = None if time_partition_column is not None : partition_column_name = time_partition_column.field if time_partition_column.field is not None else '_PARTITIONTIME' params = { 'select_sql' : select_sql, 'target_table_id' : target_table_id, 'target_table_column_names' : target_table_column_names, 'partition_column_name' : partition_column_name, 'partition_base_time' : partition_base_time } env = Environment(loader=PackageLoader( 'bigquery' , 'templates' )) sql = env.get_template(TEMPLATE_PATH).render(params) query_job = client.query(sql) query_job.result() if __name__ == '__main__' : select_sql = 'SELECT user_id, message, delivered_at, ' PUSH ' FROM `project.dataset.push_delivery` select_and_append(select_sql, ' project.dataset.offer_delivery ') CREATE TEMP TABLE insert_data AS SELECT * FROM `{{ target_table_id }}` LIMIT 0 ; INSERT INTO insert_data SELECT {% for column_name in target_table_column_names %} {{ column_name }} {% if not loop . last %} , {% endif %} {% endfor %} FROM ( {{ select_sql }} ); MERGE `{{ target_table_id }}` target USING insert_data ON ( {% for column_name in target_table_column_names %} target.{{ column_name }} = insert_data.{{column_name}} {% if not loop . last %} and {% endif %} {% endfor %} {% if partition_column_name is not none %} and DATE (target.{{ partition_column_name }}) >= parse_date( ' %Y-%m-%d ' , ' {{ partition_base_time }} ' ) {% endif %} ) WHEN NOT MATCHED THEN INSERT ( {% for column_name in target_table_column_names %} {{ column_name }} {% if not loop . last %} , {% endif %} {% endfor %} ) ROW ; 使い所 この手法を利用することで並列に同じテーブルへの追記処理が実施されても問題なく更新できます。また、上記のように1つのクエリで完結します。ただし他の手法に比べてMERGE処理のタイミングでスキャンが行われるため、処理時間および料金の増加が生じます。追記するデータ量が大きい場合は、MERGE文の条件判定に時間もかかってしまいます。 よって、使い所としては間隔が一定でないような追記処理や、追記が並列で実施されるような処理に向いています。 比較 以上を踏まえ、各処理に関しての比較をまとめました。 手法 メリット デメリット 使い所 Overwirte ・シンプル ・扱うデータ量が大きいと使えない ・並列での更新が不可 データ量が多くない更新処理 Copy Table & Append ・差分更新ができる ・差分更新する方法の中ではシンプル ・一定期間毎の更新で無いと使えない ・並列での更新が不可 ・一定間隔ごとの更新処理 ・データ量が多い場合 Create Table & Merge ・並列での更新が可能 ・複雑 ・データ量が多い場合は考慮すべき点がある ・並列での更新処理を行いたい場合 冪等化の結果 紹介したような仕組みを利用し、バッチ処理すべてを冪等化しました。それにより、データ不整合の危険や重複配信などの潜在的な問題を事前回避できるようになりました。そして、すべての処理に対して自動リトライを入れることで、一時的な問題に対して手動でのオペレーションがなくなりました。以上のように、冪等化によって安定したバッチ処理を実現できました。 まとめ 今回BigQueryの追記処理に関する冪等化の取り組みについて紹介しました。私達のチームでは、安定した仕組み開発がしたい人を募集しています。興味がある方は以下のリンクからご応募ください。 hrmos.co