TECH PLAY

Qlik

イベント

該当するコンテンツが見つかりませんでした

マガジン

該当するコンテンツが見つかりませんでした

技術ブログ

はじめに こんにちは、データ基盤ブロックの平本( @cisetn )です。 本記事では、ZOZOTOWNのリアルタイムデータ連携基盤の中核である ETL層 を作り直した事例を紹介します。対象はオンプレミスのSQL ServerからBigQueryへリアルタイムにデータを連携する基盤です。そのETL層を Goで実装したプラグイン (実行基盤はFluent Bit)で再設計しました。 ZOZOのリアルタイム連携基盤は2020年に 一度紹介記事を公開しています が、それ以降、段階的にアーキテクチャを見直してきました。本記事はその中でもETL層の再設計にフォーカスします。 想定読者は、リアルタイム連携基盤やストリーミング処理基盤の設計・運用に関わる方です。 本記事で扱うこと、扱わないことは次のとおりです。 扱う :ZOZOのリアルタイム連携の全体像、今回リプレイスした基盤の背景・設計・実装 扱わない :BigQuery側のテーブル設計、SQL Server側のChange Tracking設定、利用側(BI・分析クエリ等) 目次 はじめに 目次 ZOZOのリアルタイムデータ連携の全体像 これまでの変遷 リプレイスに至った背景 顕在化してきた課題 新基盤アーキテクチャ 設計の軸 技術選定:Fluent Bit + Goプラグイン 全体構成 大量のデータをリアルタイムで捌くために考えたこと 新基盤の構成 INPUT内部:取得とエンコードを分けた OUTPUT内部:送信とACK確認を分けた 結果 今後の展望:Change Data Captureへの移行 まとめ ZOZOのリアルタイムデータ連携の全体像 本題の前に、ZOZOにおけるリアルタイム連携の全体像を軽く俯瞰しておきます。本記事のテーマがあくまで「その中のひとつ」であることを共有するためです。 ZOZOではデータソースが多岐にわたります。オンプレミスのものもあれば、クラウド上のものもあり、MySQL、SQL Server、DynamoDBなどさまざまです。当然、差分を検知する手段もソースに応じて変わりますし、連携の実現方式も1つではありません。 マネージド / SaaSで済むケース :例えばMySQL → BigQueryであれば Datastream を利用する 専用のパイプラインを組む必要があるケース :例えばDynamoDB → BigQueryのように、対応するマネージドサービスがない場合は、別途データ連携のパイプラインを構築する必要がある 結果として、ZOZOのリアルタイム連携基盤は 複数系統に分かれて共存 しています。本記事で扱うのは、そのうち オンプレ SQL Server → BigQuery の系統です。本番環境(prd)で 約400のテーブル を連携対象としており、新規の連携依頼も日々発生するため、データ基盤の運用において比重の大きな系統となっています。SQL ServerのChange Tracking機能で変更を検知し、プラグインで取得したレコードをPub/Sub経由でBigQueryに流しています。 これまでの変遷 実は、本記事で扱う系統は今回が初めてのリプレイスではありません。以下の変遷を経ています。 時期 アーキテクチャ 主目的 2020 Qlik Replicate → fluentd + Dataflow → BigQuery 安定性向上 + コスト削減 2024 fluentd + BigQuery Subscription (Dataflow を廃止) コスト削減 2025 プラグインによる ETL 層の再設計 + BigQuery Subscription 効率改善(メモリ・スループット・コスト) 2024年には、ストリーム処理層のDataflowを廃止し、Pub/SubのBigQuery Subscriptionに置き換えるリプレイスが行われました。このフェーズの主目的はコスト削減です。 そして今回、ETL層をプラグインで再設計したのが本記事のテーマです。詳細な背景と目標は次章で述べますが、結果として、コスト削減・メモリ効率の改善・スループット向上・運用課題の解消といった効果につながりました(数値は末尾)。 リプレイスに至った背景 誤解のないよう先に述べておくと、旧基盤の設計が「悪かった」わけではありません。2020年当時、ZOZOのデータ基盤はまさに拡大していくフェーズにあり、リアルタイム連携の需要も増え始めたばかりでした。そうした状況では、プラグインが豊富なfluentdとDataflowのように既存のツールを組み合わせて素早く構築できる構成は合理的な選択だったかと思います。実際、信頼性(データ欠損が起きないこと)は チェックポイント機構 などによって担保できており、長く運用されてきました。チェックポイント機構は、処理済みのChange TrackingバージョンをBigQueryに保持する仕組みです。Pod再起動時はそこから再開できます。 顕在化してきた課題 一方で、運用を続け、データ量や利用要件が増えていく中で、 効率の側面 でいくつかの課題が徐々に顕在化してきました。 メモリ効率 :結果セットを一括でメモリに載せる実装のため、メモリ使用量がデータ量に比例して増加する構造でした。大量更新時のOOMを避けるためには「ピーク時のデータ量」を見越した大きなメモリを常時確保しておく必要があり、データ量が増えるにつれてリソース見積もりの難しさが目立つようになってきました。 コスト :上記のメモリ確保がそのままコストに直結します。メモリがトランザクション単位のデータ量に比例する構造であるかぎり、「ピーク時のデータ量」の見積もりを下回るとOOM直行となります。そのため運用上の工夫(時間帯別のスケーリング等)では本質的な改善が難しく、リソースの常時確保によるコスト増を抱え続けるしかありませんでした。 性能 :逐次処理ベースの実装のため、1トランザクションあたりの規模が大きいテーブルでは、リアルタイム性を保ちにくい場面もありました。 運用 :依存していたコンテナイメージがEOLを迎えており、継続利用にリスクがありました。加えて、内部状態の可視性が低く、障害発生時の原因特定にも時間がかかる状況でした。 一言でまとめると、各所でガタが出始めており、信頼性を維持したまま効率(メモリ・スループット・コスト)の側面を改善するため、リプレイスを検討するタイミングに来ていた、ということです。 新基盤アーキテクチャ 設計の軸 新基盤の設計指針はシンプルで、 キャパシティプランニングの軸を「ピーク時のデータ量」から「単位時間あたりの処理量」に変える ことに尽きます。信頼性(データ欠損が起きないこと)は旧基盤からチェックポイント機構によって担保されており、新基盤でもそのまま引き継いでいます。そのため本記事のテーマは 信頼性を維持したまま、効率(メモリ・スループット・コスト)をどう改善したか です。 技術選定:Fluent Bit + Goプラグイン 今回のリプレイスは、前フェーズ(2024年のDataflow撤廃 + BigQuery Subscriptionへの切り替え)の延長線上にあります。前フェーズで Dataflow関連の費用がまるごと不要になり大きなコスト削減は既に達成済み で、下流(Pub/Sub HubとBigQuery Subscription)も整理されている状態でした。一方でETL層はfluentdベースのまま残っており、メモリ効率とスループットの面で課題が顕在化していたため、今回はその続きとして ETL 層の中身を作り直す ことにしました。下流はそのまま踏襲し、ソース側(Change Tracking設定)にも手を加えません。 このスコープと、既存のPub/Sub Hub構成・BigQueryテーブル設計を維持する制約のもとで、マネージドCDCサービスやOSSのCDCミドルウェアの活用も検討しました。ただし我々のケースでは、既存テーブル設計とPub/Sub Hubへの直接出力をそのまま組み合わせ続けられる選択肢を見つけられず、プラグインとして実装する形に決めました。 採用したのは Fluent Bit + Goプラグイン です。決め手は次のとおりでした。 既存基盤がfluentdベースで運用されていたため、Fluent Bitへの移行が素直 :プラグインモデル・設定構造・デプロイ手順といった運用ノウハウがそのまま活きる INPUT(Change Tracking取得)とOUTPUT(Pub/Sub送信)の挙動を 自分たちで細かく調整できる 。後述の非同期ACK並列確認のような最適化も、プラグインとして自前で書いているからこそ仕込める Fluent BitのBuffer・バックプレッシャー機構をそのまま活用できる Goプラグイン公式サポートにより、後述する並列処理をgoroutineとchannelで素直に書ける 全体構成 以下の図は主要コンポーネントのみを示した簡略図です。 ETL層(Fluent Bit + Goプラグイン)はGKE上で動作します。プラグインは データ取得(INPUT) と Pub/Subへの送信(OUTPUT) の2つで構成されており、それぞれの実装の詳細は次章で扱います。 大量のデータをリアルタイムで捌くために考えたこと 新基盤の設計で常に意識していたのは、「 大量のデータをいかにリアルタイムで捌くか 」という問いでした。データ量が増えてもパイプラインが詰まらず、メモリ消費がデータ量に比例しない構造をどう実装するかを検討しました。前章で述べた「単位時間あたりの処理量を軸にする」方針を、Fluent Bitのパイプライン上に乗せて具体化していった話を、本章で紹介します。 なお、Fluent Bitのパイプライン構造の全体像については、 公式ドキュメント もあわせてご覧ください。 新基盤の構成 Fluent Bitのパイプライン構造はINPUT → Filter → Buffer → Router → OUTPUTという形です。新基盤ではこのうち INPUTとOUTPUTをGoプラグインで実装 しました。チャンク単位の処理やバックプレッシャーといったBuffer周りの機構はFluent Bit Engineが標準で備えています。そのためプラグイン側は INPUTとOUTPUTの"箱の中"の設計に集中できました 。 設計の出発点として、データ取得から送信までの各処理を「どこがボトルネックになるか」で整理し、並列化方針を決めました。 処理 特性 並列化方針 CT取得(クエリ → カーソル) I/O bound(DB側) 単一スレッド(DBがボトルネック) エンコード CPU bound Worker数で並列化 Pub/Sub Publish I/O bound(NW) 非同期APIで並列化 ACK確認 I/O bound(NW待ち) 別Workerプールで並列化 CPU boundとI/O boundを別レーンに分け、それぞれを独立した並列度で動かす設計です。以下、INPUT内部・OUTPUT内部の順で紹介します。 INPUT内部:取得とエンコードを分けた INPUT内部の設計では、メモリとCPUを独立した軸として扱えるようにしました。 メモリの設計 :結果セット全体を展開せず、 カーソルで小分けに読み進める方式 を採用。1回のクエリで読むレコード数 RecordsPerChunk をプラグインの設定で指定でき、本番では 10,000件/チャンク CPUの設計 :取得処理とエンコード処理を 別レーンに分け 、エンコードは複数のWorkerで並列実行 取得とエンコードの間に 中間キュー(jobs queue) を挟むことで、取得側はエンコードの完了を待たずに次のチャンクを先行投入できます。キュー容量がゼロだと直列に戻ってしまうため、本実装では jobs queue の容量をWorker数の5倍 に設定しています。 この構造のもとで、同時にレコード形式でメモリに乗るチャンク数は NumWorkers × 6 個で頭打ちになります。内訳は「jobs queue上の最大 NumWorkers × 5 個 + 各Workerが処理中の1個」です。 同時メモリ上のレコード数 = RecordsPerChunk × (jobs queue + 処理中 Worker) = RecordsPerChunk × (NumWorkers × 5 + NumWorkers) = RecordsPerChunk × NumWorkers × 6 = 10,000 × NumWorkers × 6 例えばNumWorkers = 2なら、データ量に関わらず常に約12万レコード分のメモリしか確保しなくて済みます。100万件規模のトランザクションが流れてきても、結果セット全体を一括ロードしてしまう旧基盤と違ってOOMにはなりません。 なお、Fluent Bit上でカーソル方式を実装するときには工夫が必要でした。Fluent BitはINPUTに対して定期的に「データをちょうだい」と呼び出してくる構造になっており、素朴に書くと毎回新規にクエリを発行してしまいます。それでは結果セットが毎回頭から読み直されてしまうため、 カーソル状態をプラグイン側に持ち越し、呼び出しごとに「続きから」読み進める ようにしました。 OUTPUT内部:送信とACK確認を分けた OUTPUT内部では、 送信処理とACK確認処理を別レーンに分離 しました。Pub/SubのPublishは同期的に書くと「送信 → ACK待ち → 次へ」と直列化してしまい、ACK待ちのネットワークI/Oが支配的になります。これだとスループットがACKレイテンシに律速されてしまうため、両者を分離して並列化する方針を取りました。 送信側 :非同期APIを呼んで即座にFuture相当の結果を受け取り、次へ進む。送信そのものは止まらない 確認側 :受け取ったFutureのACK確認専用のWorkerプールを設け、複数並列で確認する 各メッセージが独立したACKタイムアウトを持つようになり、1件の遅延が後続全体を巻き込む連鎖タイムアウトを構造的に防げるようになりました。 このパターンはPub/Subに限らず、Future / Promiseを返す非同期メッセージングSDKで同様に当てはまる考え方です。 送信そのものではなく、ACK確認の方をスケールさせる という発想を、我々のケースでは設計時に組み込みました。 なお、下流の詰まりに対する保護(バックプレッシャー)はFluent Bit標準の機構が動いており、OUTPUT側で詰まったときにINPUTを自動で止める仕組みが標準で得られています。これがあるおかげで、プラグイン側は「並列にどんどん投げて確認する」シンプルな構造に保てました。 結果 前章で述べたカーソル方式により、メモリ消費はデータ量に依存しなくなりました。prd環境では、ETL Podを載せているGKEクラスタのTotal Memoryが 約240GiBから約40GiBへ、約1/6にまで縮小 し、ETLのGKEコストは約 -66% 下がりました。 環境 リプレイス前 リプレイス後 削減率 prd $2,800 $940 -66% stg $3,200 $1,100 -67% 合計 $6,000 $2,000 -67% (2025年11月実績、ETLのGKEコストのみ・定価ベース) 注:stgはprdよりテーブル数が多く(stgは約500、prdは約400)、絶対額も大きくなっています。 性能面では、逐次処理からWorkerプールによる並列処理へ切り替えました。Worker数を変えるだけでスループットの線形拡張が可能な構造になりました。旧基盤では一部の大規模テーブルで遅延が長くなりやすく、監視の閾値を最大40分まで緩めて運用していました。新基盤では、全テーブル一律10分以内の閾値で安定処理しています。 運用面では、Fluent Bit標準のメトリクスにより内部状態が可視化されました。 fluentbit_input_records_total や fluentbit_output_retries_total などの指標を、GKEのMetrics Explorerから確認できます。実際、リプレイス後に予期せぬ問題が起きた際も、 fluentbit_output_retries_total の急増から原因を切り分けてデバッグできました。また、プラグインを自前で実装しているため、コアな部分まで踏み込んだ調査・修正も可能です。依存していたコンテナイメージのEOLリスクから解放された点も、得られた効果です。 今後の展望:Change Data Captureへの移行 現在はSQL Serverの Change Tracking (CT) を使っていますが、CTは「その行が変わった」ことは検知できても、変更前後の値や中間の変更履歴までは取得できません。 一方、SQL Serverには Change Data Capture (CDC) という、変更の全履歴を捕捉する機能もあります。今後はこのCTからCDCへの移行を視野に入れています。履歴を全て取得できれば、変更前後の差分分析や任意時点の状態再現など、分析側のユースケースを広げられます。 まとめ 本記事では、ZOZOTOWNのリアルタイムデータ連携基盤のETL層を、Fluent Bit + Goプラグインで作り直した事例を紹介しました。リアルタイムデータ連携基盤の設計や運用に取り組む方の参考になれば幸いです。 ZOZOでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。 corp.zozo.com
こんにちは、Data Groupのデータアナリストをしている加藤です。 2024年10月1日よりLuupで働き始め、早くも3か月以上が経過しました。(諸事情により公開が遅れました) 今回はLuupを転職先に選んだ理由と、データアナリストとしての日々の業務についてお話しします。 前職について 新卒で金融機関に入社し、2年半にわたり自治体向けのキャッシュレス決済プロジェクトのデータ分析やデータ関連の事務を担当しました。 1~2年目では、事業の理解を深めながら、BIツール(Tableau・Qlik Sense)を使って、決済状況や予算の進捗状況のダッシュボードや顧客向けレポートの作成をしま
Qlik Senseは直感的な操作で簡単にデータ分析・可視化ができるBIツールです。クリック一つで分析軸の絞り込みや画面遷移が可能で、カラフルで見やすい画面を作成できます。効果的な使用には、データの前処理と軽量化が重要です。

動画

該当するコンテンツが見つかりませんでした

書籍

該当するコンテンツが見つかりませんでした