TECH PLAY

Embulk

イベント

該当するコンテンツが見つかりませんでした

マガジン

技術ブログ

はじめに こんにちは、WEAR開発部バックエンドブロックの小山です。普段は弊社サービスである WEAR のバックエンド開発を担当しています。 WEARではハイブリッド検索などの新たな検索体験の実現を目指しています。その実現に必要な ハイブリッド検索 はOpenSearch 2.11で導入された機能です。Elasticsearch 7.10.2では利用できないため、Amazon OpenSearch Service上のエンジンをOpenSearch 2.11.0以上へ移行する必要がありました。今回はOpenSearch 2系の最新バージョンだった2.19.0を採用しました。本記事では、この移行にあたり対応したSearchkickの導入、ダブルライト戦略によるインデクシング移行、カナリアリリースによる段階的トラフィック切り替えについてご紹介します。 目次 はじめに 目次 抱えていた課題 Elasticsearch 7.10.2の限界 既存のアーキテクチャ 課題を解決したアプローチ 1. Searchkickとopensearch-rubyへの移行 elasticsearch-modelからSearchkickへ elasticsearchからopensearch-rubyへ 既存Searchableとの並存 2. インデクシングのダブルライト戦略 embulk-outputの変更 RakeタスクとDigdagワークフローの追加 3. クエリ種別ごとの動作確認 確認の目的と方針 確認対象の抽出方法 確認したクエリ種別 確認方法 4. 負荷試験 試験条件 試験結果 5. カナリアリリースによる段階的トラフィック移行 リリーススケジュール 各段階での確認項目 確認結果 効果と得られた知見 移行後のアーキテクチャ Searchkickとopensearch-rubyへの移行による保守性向上 並行稼働時のインデクサー移行方法 カナリアリリースの有効性 おわりに 抱えていた課題 Elasticsearch 7.10.2の限界 WEARではコーディネートや動画、メイクの投稿検索にAmazon OpenSearch Service上でElasticsearch 7.10.2を利用していました。しかし、以下の課題がありました。 新機能の利用不可:WEARではハイブリッド検索などの新たな検索体験を計画していたが、Elasticsearch 7.10.2はハイブリッド検索に対応しておらず、実現できない状態 サポートの先行き不透明:Elasticsearch 7.10.2は、Amazon OpenSearch Serviceで提供される最終のオープンソースElasticsearchバージョン。今後の新機能追加やセキュリティパッチの提供が見込めない状態。Elasticsearch 7.1〜7.8の標準サポートは2025年11月に終了しており、7.10.2も同様のサポート終了が予想される状態。AWS側でもOpenSearchエンジンへの移行を推奨 ライブラリのメンテナンス性: elasticsearch gem 7.14.0以降ではAmazon OpenSearch Service上のElasticsearchへ接続不可。gemのバージョンを7.13.3に固定せざるを得ず、アップデートができない状態 既存のアーキテクチャ WEARの検索基盤は、以下のシステム構成で運用していました。 検索機能: elasticsearch-model gemを利用し、検索メソッドを提供。内部では elasticsearch gemが提供する Elasticsearch::Client を通じてOpenSearch Serviceと通信 マッピング定義: elasticsearch-model gemを利用し、モデルにマッピング定義を記述 インデックス操作: elasticsearch gemを利用し、Rakeタスクによるインデックス作成、エイリアス切り替え、旧インデックス削除、ドキュメント削除 インデクシング:トラフィックを考慮し、レコード更新ごとではなくDigdagワークフローと Embulk による定時バッチ(日次の洗い替えと差分更新)でインデクシング 課題を解決したアプローチ 今回の移行では、既存ドメインのインプレースアップグレードではなく、OpenSearch 2.19.0の新規ドメインを作成し、エンドポイントを段階的に切り替える方法を採用しました。その理由は以下の通りです。 インプレースアップグレードでは、Elasticsearch 7.10.2からOpenSearch 2.19.0へ直接移行できず、 OpenSearch 1.xを経由する必要がある elasticsearch-model / elasticsearch から searchkick / opensearch-ruby へのgem移行が必要であり、アプリケーションコードに破壊的変更が生じる 検索基盤は影響範囲が大きいため、カナリアリリースで段階的にリリースしたい これらを踏まえ、Elasticsearchをダウンタイムなく移行させるために以下のアプローチで段階的に進めました。 Searchkickとopensearch-rubyへの移行 インデクシングのダブルライト戦略 クエリ種別ごとの動作確認 負荷試験 カナリアリリースによる段階的トラフィック移行 1. Searchkickとopensearch-rubyへの移行 移行前後のgemの対応関係は以下の通りです。 責務 Elasticsearch利用時 OpenSearch移行後 検索機能 elasticsearch-model (内部で elasticsearch を利用) searchkick (内部で opensearch-ruby を利用) マッピング定義 elasticsearch-model searchkick インデックス操作 elasticsearch 直接利用 opensearch-ruby 直接利用 elasticsearch-modelからSearchkickへ 検索機能とマッピング定義については、既存の elasticsearch-model の代わりに、 searchkick に移行しました。Searchkickを選定した理由は以下の通りです。 OpenSearchを公式にサポートしている リポジトリが継続的にメンテナンスされている nested型への対応など、 elasticsearch-model との互換性がある reindex時のアトミックなエイリアス切り替えが組み込まれているほか、ハイブリッド検索やセマンティック検索にも対応しており、高度な機能を備えている elasticsearchからopensearch-rubyへ インデックス操作のRakeタスクでは、 elasticsearch を使用していました。OpenSearch移行に伴い、これを opensearch-ruby に置き換えました。 - require 'elasticsearch' - client = Elasticsearch::Client.new(client_options) + require 'opensearch-ruby' + client = OpenSearch::Client.new(client_options) client.indices.update_aliases(...) client.indices.delete(...) opensearch-ruby は elasticsearch とAPIの互換性が高いため、クライアントの初期化部分とエラークラスの変更で、既存のインデックス操作ロジックをそのまま利用できました。 唯一の例外がインデックス作成タスクで、ここではSearchkick経由でマッピング定義を取得して作成しています。 task :create_index , [ :index_name ] => :environment do |_, args| index_class = index_class_name(args[ :index_name ]).singularize.capitalize.constantize index = Searchkick :: Index .new(args[ :index_name ]) model_config = index_class.search_index.index_options # Searchkickからマッピング取得 index.create(model_config) # Searchkick経由で作成 end このように、マッピング定義はSearchkickに一元化しつつ、その他のインデックス操作は opensearch-ruby を直接使用する構成としました。 既存Searchableとの並存 WEARでは、モデルごとに *Searchable というconcernを定義し、 elasticsearch-model を利用した検索用のデータ定義とマッピング定義を集約していました。 移行期間中は、Elasticsearchを利用するサーバーとOpenSearchを利用するサーバーを並行稼働させる必要がありました。そこで、モデルごとに *OpensearchSearchable concernを新設し、既存の *Searchable と並存させる構成をとりました。 既存の *Searchable はElasticsearch用のconcernです。 # 既存: Elasticsearch用 module Searchable extend ActiveSupport :: Concern # elasticsearch-model を利用したデータ定義とマッピング定義 end 新設した *OpensearchSearchable はOpenSearch用のconcernです。 # 新規: OpenSearch用 module OpensearchSearchable extend ActiveSupport :: Concern included do searchkick index_name : Rails .configuration.x.application[ :opensearch ][ :index_name ], settings : Rails .configuration.x.application[ :opensearch ][ :settings ], callbacks : false , merge_mappings : true , mappings : search_mappings def search_data # searchkick を利用したデータ定義 end end module ClassMethods def search_mappings # searchkick を利用したマッピング定義 end end end merge_mappings: true を指定することで、独自に定義したマッピングをSearchkickの自動生成マッピングにマージしています。 callbacks: false を指定することで、Searchkickの自動インデクシングを無効化し、既存のEmbulkによるインデクシングとの競合を防いでいます。 2. インデクシングのダブルライト戦略 移行期間中、ElasticsearchとOpenSearchの両方にデータを投入するダブルライトを実施しました。WEARのインデクシングは日次バッチによる洗い替え方式のため、ダブルライトを開始した時点で既存データも含めてOpenSearchに自動で同期されます。そのため、既存データの移行作業を別途行う必要はありませんでした。 embulk-outputの変更 前述の通り、既存の構成ではEmbulkを介して、BigQueryからデータを取得してElasticsearchにインデクシングしていました。インデクシング時のBigQueryのクエリコストが高額なため、OpenSearchにもインデクシングを行う際に単純にジョブを複製してしまうと、費用が2重に掛かってしまうという課題がありました。 そこで、embulk-outputの出力先をElasticsearchとOpenSearchの両方に向けることで、SQLの実行は一度だけで双方にデータを転送できるようにしました。 移行前はElasticsearchのみに出力していました。 # Elasticsearchへのインデクシング時 out : type : elasticsearch mode : insert nodes : - { host : {{ elasticsearch_host }} , port : {{ elasticsearch_port }}} index : {{ elasticsearch_index }} { % Elasticsearchの設定値 % } ダブルライト時は type: multi を使い、ElasticsearchとOpenSearchの両方に出力しました。 # ElasticsearchとOpenSearchにダブルライトするインデクシング時 out : type : multi outputs : - type : elasticsearch mode : insert nodes : - { host : {{ elasticsearch_host }} , port : {{ elasticsearch_port }}} index : {{ elasticsearch_index }} { % Elasticsearchの設定値 % } - type : elasticsearch mode : insert nodes : - { host : {{ opensearch_host }} , port : {{ opensearch_port }}} index : {{ opensearch_index }} { % OpenSearchの設定値 % } ダブルライトのために embulk-output-multi を新たに導入し、複数出力先への分岐を実現しました。OpenSearch側の出力も type: elasticsearch を指定しています。 embulk-output-elasticsearch はOpenSearchとのAPI互換性により、そのままOpenSearchへの出力にも利用できました。 RakeタスクとDigdagワークフローの追加 OpenSearch向けのインデックス操作のRakeタスクとDigdagワークフローを作成し、OpenSearchに対しても実行できるようにしました。 # 既存のElasticsearchのインデックス作成 +create_index_elasticsearch: sh>: ... rails "elasticsearch:create_index[${index_name}]" # 追加したOpenSearchのインデックス作成 +create_index_opensearch: sh>: ... rails "opensearch:create_index[${index_name}]" 3. クエリ種別ごとの動作確認 OpenSearch移行後にすべてのクエリ種別が正常に動作するかをQA環境で確認しました。 確認の目的と方針 Elasticsearchに送信されるクエリの種別ごとに、OpenSearch上でも同等の結果が返ることを確認しました。クエリ種別が重複するエンドポイントは確認対象外とし、効率的に網羅性を担保しました。 確認対象の抽出方法 確認対象の抽出は以下の手順で行いました。 対象エンドポイントの洗い出し:リポジトリ内でElasticsearchのQueryクラスを呼び出している箇所をリストアップ WEAR Webの対象画面の特定:Webマスタ仕様書から対象エンドポイントが使用されている画面を確認 クエリの特定:APIのリクエストパラメーターから生成されるOpenSearchのクエリJSONを特定し、使用されているクエリ種別を分類 確認したクエリ種別 以下のクエリ種別を対象に、WEAR iOS・Android・Webの各プラットフォームで動作確認を実施しました。 分類 クエリ種別 検索クエリ term 、 terms 、 range 、 nested 、 bool ( filter / must_not / must / should )、 function_score 、 exists ソート sort ページング from 、 size グループ化 collapse 複合検索 msearch 確認方法 WEAR iOS・Android・Webの各プラットフォームで、以下の方法で確認しました。また、対応するRSpecテストを実行し、OpenSearchに対するクエリが正常に動作することはCI上で確認しています。 WEAR iOS・Android:QA環境のAPIに対してcurlコマンドでリクエストを送信し、レスポンスを確認。 WEAR Web:ブラウザ上で対象画面を操作し、APIレスポンスと画面表示を目視確認。 すべてのクエリ種別で正常な動作を確認し、負荷試験に進みました。 4. 負荷試験 本番リリース前に、OpenSearchクラスターがElasticsearch利用時と同等のリクエスト量を処理できるかを確認するため、QA環境で負荷試験を実施しました。 試験条件 QA環境のOpenSearchクラスターを本番環境のElasticsearchと同等のスペックに設定 検索エンドポイントのRedisキャッシュを無効化し、OpenSearchへの直接的な負荷を計測 k6を用いて、各検索エンドポイントに対して本番のピーク帯のMAX rps相当のリクエストを6時間継続 試験結果 レイテンシ :Datadog APMで各検索エンドポイントのp99レイテンシを直近1か月の平均と比較した結果、OpenSearchがボトルネックとなるレイテンシ劣化は観測されなかった エラー :Datadog APMで各検索エンドポイントを確認した結果、OpenSearch起因のエラーは発生しなかった クラスターメトリクス :本番のピーク帯MAX値相当のリクエストを6時間継続した。CPUUtilizationはリクエスト量に対して許容範囲内、JVMMemoryPressureは本番環境と同程度であり、各種メトリクスに大きな影響はなかった この結果をもとに、カナリアリリースによる段階的な本番投入を判断しました。 5. カナリアリリースによる段階的トラフィック移行 本番リリースでは、カナリアリリースによって段階的にトラフィックを移行しました。 リリーススケジュール 日時 内容 2025/9/30 13:00 canary podの作成、APIの正常確認、1%リリース 2025/9/30 17:00 10%リリース 2025/10/1 14:00 50%リリース 2025/10/2 13:30 100%リリース 2025/10/2〜10/6 正常性の継続監視 各段階での確認項目 各段階で以下の項目を確認し、問題がなければ次の段階に進みました。 OpenSearchのレイテンシ比較とエラー確認:Datadog APMでOpenSearchとElasticsearchのレイテンシを比較し、劣化がないことを確認。OpenSearchのエラーがないことを確認。 各検索エンドポイントのレイテンシ比較とエラー確認:Datadog APMで各検索エンドポイントのレイテンシを比較し、劣化がないことを確認。OpenSearch起因のエラーがないことを確認。 クラスターメトリクス:SearchLatency、IndexingLatency、CPUUtilization、JVMMemoryPressureを監視し、劣化がないことを確認。 インデックスの整合性:ElasticsearchとOpenSearchのドキュメント件数に差異がないことを確認。 確認結果 OpenSearchでレイテンシが低い傾向を確認した(平均・最小・最大いずれもOpenSearchの方が高速) OpenSearch起因のエラーが発生しなかった OpenSearchでJVMMemoryPressureがやや高い傾向にあったが、MAXでも60%未満であり問題なかった CPUUtilizationはOpenSearchの方が低い傾向だった 100%リリース後の監視でも劣化が見られず、移行完了を判断した 効果と得られた知見 移行後のアーキテクチャ 移行後の検索基盤は、以下のシステム構成になりました。 検索機能: searchkick gemを利用し、検索メソッドを提供。内部では opensearch-ruby gemが提供する OpenSearch::Client を通じてOpenSearch Serviceと通信 マッピング定義: searchkick gemを利用し、モデルにマッピング定義を記述 インデックス操作: opensearch-ruby gemを利用し、Rakeタスクによるインデックス作成、エイリアス切り替え、旧インデックス削除、ドキュメント削除 インデクシング:既存のDigdagワークフローと Embulk による定時バッチ(日次の洗い替えと差分更新)でのインデクシングを継続 Searchkickとopensearch-rubyへの移行による保守性向上 elasticsearch-model から searchkick 、 elasticsearch から opensearch-ruby に移行し、以下の効果と知見がありました。 OpenSearchの将来的なバージョンアップへの追随が容易になった reindex処理のアトミックなエイリアス切り替えが組み込みで利用可能になった ハイブリッド検索の機能が利用可能になった opensearch-ruby はAPI互換性が高く、Rakeタスクの移行コストが低かった 並行稼働時のインデクサー移行方法 ダブルライト戦略により、以下のメリットがありました。 ElasticsearchとOpenSearchを並行稼働させることで、いつでも切り戻し可能な状態を維持 Embulkを利用した既存のインデクシングパイプラインを最小限の変更で拡張 移行時のクエリコスト増大を防止 Digdagワークフロー層での制御により、アプリケーションコードへの影響を最小化 カナリアリリースの有効性 段階的なトラフィック移行により、以下の知見が得られました。 1%リリースと10%リリースで、JVMMemoryPressureの変動が大きく見られた。これは、リリース後の低トラフィック時にキャッシュヒット率が低いことに起因する可能性が高く、50%リリース以降は安定した。 検索基盤のような影響範囲の大きいミドルウェアの移行にはカナリアリリースが有効であることを実感した。 おわりに 本記事ではWEARにおけるElasticsearch 7.10.2からOpenSearch 2.19.0への移行プロセスを紹介しました。同様の移行を検討している方の参考になれば幸いです。 ZOZOでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。 corp.zozo.com
はじめに こんにちは、商品基盤部の杉浦、小原、寺嶋です。普段はZOZOTOWNのお気に入り基盤・商品レビュー基盤といった商品サブドメインを担当しています。 私たちのチームでは運用コスト削減を目的として、お気に入りデータベースをオンプレミスのSQL ServerからAWS Aurora MySQLへの移行に取り組んでいます。お気に入りデータは数十億レコードに及び、移行中もデータが増え続けるためデータの静止点が作れないという課題がありました。本記事では、この大規模データ移行における初期移行の取り組みと、Embulkを用いた差分同期について紹介します。 なお、新規データの書き込みを担保するダブルライト戦略については 前回の記事 で紹介しています。あわせてご覧ください。 目次 はじめに 目次 お気に入りリプレイスの概要 技術スタックの老朽化 オンプレミスSQL Serverの運用限界 背景・課題 初期移行 制約と課題 検証と最適化 本番移行の結果 得られた学び Embulkによる差分同期 ジョブ設計 ソースDBへの負荷制御 データ整合性の担保 設定管理とチューニング まとめ お気に入りリプレイスの概要 ZOZOTOWNのお気に入り機能は、会員が興味のある商品・ブランド・ショップを登録し、お気に入り一覧から確認できる機能です。まず、ユーザー種別として 会員 と ゲスト会員 の2種類が存在し、それぞれ独立したテーブルで管理されています。お気に入り登録の対象も 商品・ブランド・ショップ の3種類があり、ユーザー種別との掛け合わせにより、合計6パターンのテーブルが移行対象となります。さらに、 過去に削除されたお気に入りの履歴(アーカイブデータ) も保持されており、これらを含めると移行対象のテーブルは多岐に及びます。テーブルによってレコード数は数千万レコードから数十億レコードまで幅があり、合計すると数十億レコード規模のデータ移行となりました。 この構成は長年にわたりZOZOTOWNを支えてきましたが、以下のような課題を抱えていました。 技術スタックの老朽化 ZOZOTOWNは2004年の開始当初からClassic ASP(VBScript)とSQL Serverのストアドプロシージャでビジネスロジックを実装してきました。しかし、VBScriptは開発元のMicrosoftも積極的に開発しておらず、クラウドベンダーのSDKが提供されていないなど技術的な制約が大きくなっていました。こうした背景からZOZOTOWN全体で リプレイスプロジェクト が進められており、お気に入り機能もその一環としてマイクロサービスへの刷新に取り組んでいます。 オンプレミスSQL Serverの運用限界 ZOZOTOWNは運営開始から10年以上にわたりオンプレミス環境でシステムを拡大してきましたが、スケーラビリティや保守コストの面で課題を抱えていました。2017年より ストラングラーフィグパターンによる段階的なマイクロサービス移行 が進められています。お気に入り機能のデータベースもその一環として、オンプレミスのSQL ServerからAWS上のAurora MySQLへの移行が必要でした。しかし、以下の制約がありました。 Read/Writeが常時発生しており、 システム停止を伴う移行は不可能 書き込んでから読み取れるまでの許容タイムラグが短く、 レプリケーション方式では要件を満たせない オンプレミスDBへの設定変更が必要なマネージドサービス(AWS DMS等)は、 他機能への影響を考慮し使用を見送り お気に入りデータが膨大なため、 インデックス設定などのチューニングにも数時間を要する状態 これらの課題を踏まえ、移行方式を設計し技術検証しました。移行戦略の全体像は以下の3フェーズで構成されています。 フェーズ1 : SQL Server単体での運用(移行前) フェーズ2 : SQL ServerとAurora MySQLのデュアル運用(移行期間) フェーズ3 : Aurora MySQL単体での運用(移行完了) フェーズ2におけるダブルライトの仕組みやフェーズ切り替えの実装については 前回の記事 で紹介しています。本記事ではこのフェーズ2にフォーカスします。 背景・課題 初期移行 初期移行は、ソースDB(オンプレミスSQL Server)からターゲットDB(Aurora MySQL)へのデータ一括移行です。全体の流れは以下の通りです。 抽出 : SQL Serverから bcp でCSV出力 転送 : CSVファイルをS3へアップロード ロード : LOAD DATA FROM S3 でAurora MySQLへインポート インデックス構築 : ALTER TABLE でインデックスを追加 制約と課題 今回の初期移行には、以下の制約がありました。 ソースDB(本番稼働中) : 影響を最小限に抑える必要がある ターゲットDB(サービスイン前) : 大胆な最適化が可能 この非対称な条件から、「 抽出は慎重に、インポートは大胆に 」という方針を採用しました。抽出には bcp (Bulk Copy Program)を採用しました。 bcp はSQL Server標準のバルクエクスポートツールであり、SELECT文による抽出と比較して以下の利点があります。 高スループット : 200,000〜500,000行/秒の安定した出力性能 シンプルな運用 : 追加のミドルウェアやライセンスが不要 転送ではS3を中継することで、ロード失敗時に再抽出せず再実行できる設計としています。 一方、事前試算では最大規模テーブルのインポートに 数日〜1週間 を要することが判明しました。ロード時間が長期化すると、以下のリスクが高まります。 接続切断・タイムアウト : 数日に及ぶ処理は中断リスクが高い 障害時の復旧困難 : 失敗時のデバッグと再実行に多大な時間を要する 移行スケジュールへの影響 : ダブルライト期間が長期化し、運用負荷が増大する ロールバック困難 : 問題発覚時に手戻りできる時間的余裕がなくなる これらのリスクを軽減するため、インポート処理の最適化が必須でした。 検証と最適化 本番移行に先立ち、約6,000万レコードを持つテーブルを用いて3つの観点で検証しました。 1. 並列化の効果 LOAD DATA FROM S3 MANIFEST でマニフェスト分割による並列実行を検証しました。CSVファイルを4分割・8分割・16分割と変化させましたが、スループットは 約51,000〜53,000行/秒で横ばい でした。 今回のAurora構成はProvisioned(単一ライターノード)であり、並列ロードを実行してもCPUおよびストレージI/O帯域がボトルネックとなります。Aurora Serverless v2のような動的スケーリング構成であれば結果が異なる可能性もありますが、今回の構成では並列化による改善は限定的でした。 2. インデックス戦略 方式 内容 処理効率 パターンA インデックスなしでLOAD → 後からALTERで追加 約61,000〜68,000行/秒 パターンB インデックスありでLOAD 約39,000〜42,000行/秒 パターンAが 最大59%高速 でした。行挿入ごとのインデックス更新はランダムI/Oを発生させますが、一括構築ならソート後、シーケンシャルに処理できます。ターゲットDBは未稼働のため、この最適化を採用しました。 3. インスタンスサイズ インスタンスタイプ別のスループットを比較しました。料金は Amazon Aurora の料金 を参照しています。 インスタンス インポート効率 ALTER効率 オンデマンド時間単価 r6i.2xlarge 約125,500行/秒 約120,300行/秒 約$0.63/時 r6i.16xlarge 約162,200行/秒 約162,800行/秒 約$5.00/時 r6i.16xlargeはr6i.2xlargeと比較して約30%のスループット向上が見られた一方、コストは約8倍です。このスループット差がテーブル規模によって処理時間に与える影響は以下の通りです。 大規模テーブル(数十億レコード) : 2〜3時間の短縮 → リスク低減に寄与 小規模テーブル(数千万レコード) : 数分の短縮 → コスト対効果が低い この結果から、大規模テーブルはr6i.16xlargeで時間短縮とリスク低減を図り、中小規模テーブルはr6i.2xlargeでコスト効率を最大化する ハイブリッド戦略 を採用しました。 本番移行の結果 検証結果をもとに本番移行を実施しました。最終的な移行実績は以下の通りです。 テーブル規模 テーブル数 LOAD DATA ALTER TABLE 総所要時間 最大規模(数十億レコード) 2 約4日 約7時間 約4日半 中規模(数億レコード) 1 約3時間 約20分 約3時間 小規模(数千万レコード) 5 約1時間 約10分 約1時間 合計 8 - - 約5日 数十時間に及ぶロードでは、以下のクエリで進捗を監視しました。 SET @target_rows = ?; -- 目標件数(テーブルの総行数) SET @thread_id = ?; -- 監視対象のスレッドID SELECT CONCAT ( ' Thread ' , trx.trx_mysql_thread_id) AS target_name, CONVERT_TZ(trx.trx_started, ' UTC ' , ' Asia/Tokyo ' ) AS 開始時刻_JST, ROUND (TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP()) / 3600 , 2 ) AS 経過時間 _ 時間, trx.trx_rows_modified AS 挿入済み行数, @target_rows AS 目標件数, ROUND (trx.trx_rows_modified / TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP()), 1 ) AS スループット _ 行毎秒, ROUND (trx.trx_rows_modified / @target_rows * 100 , 2 ) AS 進捗率 _ パーセント, ROUND ( (@target_rows - trx.trx_rows_modified) / (trx.trx_rows_modified / TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP())) / 3600 , 2 ) AS 残り時間 _ 時間, DATE_ADD( CONVERT_TZ(NOW(), ' UTC ' , ' Asia/Tokyo ' ), INTERVAL ROUND ( (@target_rows - trx.trx_rows_modified) / (trx.trx_rows_modified / TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP())) ) SECOND ) AS 完了見込み時刻_JST FROM information_schema.innodb_trx trx WHERE trx.trx_mysql_thread_id = @thread_id; information_schema.innodb_trx の trx_rows_modified から処理済み件数を取得し、経過時間で割ってスループットを算出します。目標件数との差分から残り時間と完了見込み時刻を推定し、数日に及ぶ処理においても見通しを立てられるようにしました。 得られた学び 学び 根拠 並列化は万能ではない マニフェスト分割を試みたが、単一ノードのI/O帯域がボトルネックとなり効果は限定的でした。闇雲に並列化するのではなく、律速段階を特定することが重要です インデックスは後付けが基本 ロード後に一括構築することで最大59%高速化。行挿入ごとのインデックス更新はランダムI/Oを発生させるが、一括構築ならソート後シーケンシャルに処理できる インスタンスサイズはテーブル規模で使い分ける 大規模テーブルはr6i.16xlargeで時間短縮とリスク低減、小規模テーブルはr6i.2xlargeでコスト効率を最大化。スループット向上率とコスト増加率のバランスを見極める 必ず本番同等データでリハーサルする 6,000万レコードでの検証結果を数十億レコードに線形外挿すると誤差が生じる。I/Oやメモリの振る舞いはデータ規模で変化するため、全量リハーサルが不可欠 やり直せる設計が安心を生む S3を中継することでロード失敗時も再抽出不要で再実行できる。数日かかる処理では「失敗しても復旧できる」という安心感が運用の質を高める この工程が安定したことで、後続の増分同期フェーズへ安全に進められました。 Embulkによる差分同期 初期移行が完了した後も、オンプレミスのSQL Serverには新規データが書き込まれ続けます。この増加分をAurora MySQLへ反映するため、 Embulk を用いた差分同期の仕組みを構築しました。 図中の「 マスタ 」はマイクロサービスがSQL Serverをマスタ(書き込みの主系)として参照・更新することを示しています。「 非同期 」はマイクロサービスがSQL Serverと同じ結果をAurora MySQLへ非同期に反映されることを示しています。「 保存 」はEmbulkジョブ完了後に差分の起点となる状態(config-diff)をS3へアップロードすることを指しています。「 復元 」は次回ジョブ起動時にS3からその状態をダウンロードすることを指しています。これにより前回の続きから差分取得を再開できます。 ジョブ設計 Embulkのインクリメンタル同期では、 updated_at のような更新日時カラムを差分キーとして利用するのがベストプラクティスです。しかし、今回の移行元テーブルはInsert/Deleteのみの操作で設計されており、レコードの更新(Update)が発生しないため updated_at に相当するカラムが存在しません。このテーブルの特性を踏まえ、操作種別ごとに差分キーを使い分ける設計を採用しました。 1つのテーブルに対して役割の異なる最大3つのジョブを用意しています。 ジョブ種別 インクリメンタル列 対象レコード 通常ジョブ 登録日( registered_at ) 新規追加されたレコード 削除ジョブ 削除日( deleted_at ) 論理削除されたレコード アーカイブジョブ 連番ID 削除テーブルへ移動済みのレコード 通常ジョブは登録日、削除ジョブは削除日をそれぞれ基準にレコードを取得します。 -- 通常ジョブ WHERE registered_at >= :registered_at -- 削除ジョブ WHERE deleted_at IS NOT NULL AND deleted_at >= :deleted_at アーカイブジョブでは、Embulkの before_load と after_load フックを活用し、以下の3ステップを1つのジョブ内で完結させています。 out : mode : merge_direct before_load : > UPDATE watermark SET id = (SELECT COALESCE(MAX(id), 0) FROM archived_favorites) after_load : > DELETE FROM favorites WHERE EXISTS ( SELECT 1 FROM archived_favorites WHERE archived_favorites.favorite_id = favorites.id AND archived_favorites.id >= (SELECT id FROM watermark) ) before_load でロード前のアーカイブテーブルの最大IDをウォーターマークとして記録し、 after_load でウォーターマーク以降の新規アーカイブ分に対応するお気に入りレコードを物理削除します。ウォーターマークがなければアーカイブテーブル全レコードが削除対象となり、毎回全件スキャンが発生します。ウォーターマークにより、今回のジョブで追加された差分だけに処理を限定しています。この設計により、お気に入り商品・ブランド・ショップの各テーブルに対してゲスト・会員の2種類を掛け合わせた複数パターンの差分同期を体系的に管理しています。 ソースDBへの負荷制御 差分同期では稼働中のオンプレミスSQL Serverからデータを読み取ります。本番サービスへの影響を抑えるため、複数のパラメータで負荷を制御しました。 # 共通入力設定(抜粋) in : type : sqlserver transaction_isolation_level : NOLOCK # ロック競合を回避 fetch_rows : 1000 # メモリ消費を抑制 SELECT TOP 10000 -- 1回あたりの取得行数を制限 registered_at, id, member_id, ... FROM favorites WITH (NOLOCK) WHERE registered_at >= :registered_at ORDER BY registered_at OPTION (MAX_GRANT_PERCENT = 25 ) -- クエリのメモリグラント上限を設定 NOLOCK ヒントでロック競合を回避し、 TOP N 句で1回あたりの取得行数を制限しています。 fetch_rows でJDBCのフェッチサイズを制御し、 MAX_GRANT_PERCENT でSQL Serverのクエリメモリグラント上限を設定しました。 また、embulk-input-sqlserverのインクリメンタルロードでは、対応する列型が整数型・文字列型・ datetime2 型に 限定されています 。しかし、移行元テーブルの日時カラムは smalldatetime 型であり、そのままではインクリメンタル列として使用できません。この制約の回避策として、クエリ内で CAST(削除日カラム AS DATETIME) と明示的に型変換しています。 データ整合性の担保 差分取得では > ではなく >= を使用しています。 > の場合、同一タイムスタンプに複数レコードが存在すると一部を取りこぼすリスクがあります。 >= では前回の最終レコードを重複取得する可能性があります。しかし、Embulkの出力モードを merge_direct に設定すれば、重複分はUPSERTとして吸収されます。 out : mode : merge_direct 「取りこぼし」と「重複」のトレードオフにおいて、 重複を許容しつつ冪等性で吸収する 方針を採用しました。 差分の起点となる状態管理にも工夫が必要でした。Embulkは --config-diff オプションにより、前回処理の最終レコード( last_record )をYAMLファイルに記録します。 in : last_record : [ '2023-12-23T09:00:30.000000' ] out : {} しかし、Kubernetes Jobとして実行する場合、Podはジョブ完了後に破棄されます。ローカルファイルシステム上の差分状態は失われるため、S3に永続化する仕組みを構築しました。 ジョブ開始時にS3から前回の差分状態をダウンロード Embulkによる差分同期の実行と差分状態の更新 ジョブ完了時に更新された差分状態をS3にアップロード ここで、ダウンロードとアップロードの失敗は致命的エラーとしてジョブを失敗させます。 設定管理とチューニング 複数パターンの設定ファイルは、対象テーブルやカラム名が異なるものの接続情報やパラメータは共通しています。EmbulkのLiquidテンプレート機能を活用し、共通部分を3つのテンプレートに集約しました。 共通テンプレート 役割 入力設定 SQL Server接続情報、トランザクション分離レベル、フェッチサイズ 出力設定 MySQL接続情報、出力モード SELECT句生成 環境変数に基づく TOP N 句の条件付き生成 個別の設定ファイルでは共通テンプレートをインクルードし、テーブル名・カラム名・WHERE句のみを定義します。SELECT句の共通テンプレートでは、環境変数が未設定の場合は TOP 句自体を生成せず、設定されている場合のみ行数制限を付与する条件分岐を実現しています。これにより、本番環境では制限なし、検証環境では制限ありという切り替えが可能です。 負荷制御パラメータ( TOP N 、 fetch_rows 、 MAX_GRANT_PERCENT 等)もすべて環境変数に切り出しており、コンテナイメージの再ビルドなしに変更を反映できます。テーブル単位で処理時間を計測してボトルネックを特定し、検証環境での調整結果を本番環境へ反映するサイクルを効率的に回せる設計です。 まとめ 本記事では、ZOZOTOWNのお気に入りデータベースにおける数十億レコード規模のデータ移行について、初期移行の最適化とEmbulkを用いた差分同期の取り組みを紹介しました。 初期移行では、インデックスの後付けやテーブル規模に応じたインスタンスサイズの使い分けにより、約5日間で全テーブルの移行を完了しました。差分同期では、 updated_at カラムが存在しない制約に対し、役割の異なる複数ジョブを設計することで、サービス無停止のまま増分データの反映を実現しました。 大規模データ移行やEmbulkによる異種DB間の差分同期を検討されている方にとって、本記事が参考になれば幸いです。今後はAurora MySQL単体運用への切り替えを進め、お気に入り機能のマイクロサービス化を完遂していきます。 ZOZOでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。 corp.zozo.com
この記事は「 ファインディエンジニア #1 Advent Calendar 2025 」の24日目の記事です。 沢山のアドベントカレンダー記事が執筆されていますので、年末のお供に是非読んでみてください。 adventar.org はじめに ソフトウェアエンジニアの 土屋(@shunsock) です。私の所属するデータソリューションチームでは、ファインディ全体のデータ活用を推進するためのデータ基盤を構築しています。 今回、我々はデータ基盤のRDSとBigQueryのテーブル同期システム (EL Pipeline) のリプレースを行い、 DuckDBを本番導入 しました。本稿では、活用に至った経緯と実際に組みこむにあたる課題、および成果を紹介します。 はじめに ファインディにおけるテーブル同期システムの立ち位置 リプレイスの背景 補足 技術選定 Datastream DuckDB Datastream, DuckDB両採用の理由 システム設計 概要 CLIを挟む理由 GitHub Actionsからの起動にした理由 複数のRDSを転送する 開発運用と成果 開発・運用してみた感想 可読性と拡張性が高い とはいえまだまだ新興のソフトウェア まとめ ファインディにおけるテーブル同期システムの立ち位置 ファインディでは、ウェブアプリケーションをAWS上のECSとRDS、データ基盤をGoogle CloudのBigQueryで作成しています。 このような構成を取っているため、AWSのRDSとGoogle CloudのBigQueryを同期してテーブルを最新にする必要があります。 次の図は、Findy Tools事業部における、現在のデータフローの概念図です。AWS上に存在するRDSのデータをBigQueryに転送していることが分かります。 リプレイスの背景 弊社では従来、OSSのEL(Extract Load) ツール  Embulk  をECSに載せて長期間運用していました。弊社で利用しているRDBMSやデータウェアハウスに対応している他、社内に知見を持った方が在籍しているためです。 しかし、近年では、 Embulkのエコシステムのレガシー化や 長期的なメンテナが不足 が課題 となっています。特に、 将来のメンテナンスが不透明な点は、セキュリティインシデントに繋がりかねない ため危惧していました。 また、 Embulkの起動の遅さも課題 にしていました。我々はBigQueryプラグインなどを利用していたため、JVM上でさらにJRuby VMを立ちあげます。このような構成は テーブル同期の遅さに繋がり、ECSの課金額を増やす要因 となっていました。 このように、システムを堅牢にすることと処理スピード向上による料金のコストダウンが今回のプロジェクトの目的でした。 補足 Embulkのメンテナーの方も 「オープンソース・プロジェクトのたたみ方」 というブログ記事で脆弱性について次のように述べています。 おそらくいくつかの攻撃は既に成功していて、私たちのソフトウェア・サプライチェーンには、悪意のあるコードがとっくに入り込んでいる、と認識しておくべきでしょう。 技術選定 Datastream, Spark, その他 ELTツールなど、複数の移行先候補がありました。その中で、データ規模に応じて次の2つから選定することにしました。 Datastream: ニアリアルタイムでの更新が欲しい場合や大規模データの場合 DuckDB: 小規模データの場合 Datastream Datastream は Google Cloudが提供するサーバーレスのCDC (Change Data Capture), Replicationツールです。 CDCは、あるソースのシステムを監視し、そのシステムに対する操作をニアリアルタイムで、ターゲットとなるシステムに反映する仕組みのことです。これによりAWSのRDSに対する変更を即座にBigQueryに反映可能です。 DuckDB DuckDB は高速なアナリティカルデータベースです。s3などのストレージサービスに出力されたログ分析やファイルフォーマットの変換、wasmによるフロントエンドでの活用など広い用途で活用されています。 接続先や出力フォーマットが非常に豊富な他、C++製のマルチスレッドランタイムにより、高速に動作する点が魅力です。 次の写真はDuckDBのPoC時に行なったベンチマークです。小さなテーブルで転送を試したところ、 1.5倍程度の高速でした 。 ソフトウェア名称 平均 標準偏差 最速 最遅 Embulk 253秒 8秒 242秒 261秒 DuckBD 176秒 30秒 137秒 209秒 補足: 実際にパフォーマンステストを行ったときの様子 Datastream, DuckDB両採用の理由 今回のリプレイスでは、コスト最適化を軸に Datastream と DuckDB の2種類のアプローチを使い分ける構成を採用しました。 DatastreamはフルマネージドでサーバーレスなCDCサービスと強力です。一方で、ニアリアルタイム性が不要な小規模データに対しては機能過多となり、費用面でも割高になります。そこで、リアルタイム性を求めない領域では、より軽量でシンプルに扱えるDuckDBを使って同期を行う方針を取りました。 本記事の以降では、上記のうち、DuckDBによってどのようにテーブル同期システムを構築したか、開発運用で見えた知見を説明します。 システム設計 概要 次の画像は我々のDuckDBによるテーブル同期システムの概念図です。 次のように各種ソフトウェアが起動します。 GitHub Actionsの on_schedule でワークフローが起動 ワークフローがECS Fartate Taskを起動 Fargate Taskがコンテナランタイムを起動 コンテナランタイムの中でCLIアプリケーションが起動 CLIアプリケーションが引数と設定ファイルからSQLを生成 CLIアプリケーションがDuckDBでSQLを実行 CLIを挟む理由 DuckDBを直接起動しない理由は、1回の実行で1テーブルずつ送信できるようにするためと、SQLを直接書かずに設定ファイルをインターフェースにするためです。 実際のユーザーの入力インターフェースは次のようなYAMLです。 dataset_id : lake... table_name : table_name select_statement : "hoge, fuga, ..." GitHub Actionsからの起動にした理由 元々のワークフローはEventBridge Schedulerだったのですが、システム障害時にEventBridgeのcronを変更するなど運用負荷が重い状態でした。 DispatcherをGitHub Actionsにすることでボタン操作だけで検証可能にしました。 また、1テーブルずつの送信にしたので、ステージング環境での動作検証も簡単かつ軽量です。ユーザーは次のようなWorkflow Dispatchを起動するだけで動作検証が完了します。 複数のRDSを転送する 現在のFindy Tools事業部のワークフローを見ると分かる通り、複数のRDSを転送する必要がありました。そこで開発用スクリプトを汎用化して動的なビルドやawsコマンドの発火をしています。 開発運用と成果 開発は、私1人で1か月弱でしました。最初の1プロジェクトこそ時間がかかったものの、モノレポ構成にしたおかげで 従来1か月かかった新規データソースの追加が1週間程度になりました。 処理速度については、直列稼動から並列稼動へ変更となったため単純な比較は難しいのですが、 1テーブルあたり約30秒から約10秒に短縮 できました。 すでに他のメンバーからもプルリクエストが届いており、社内でも手応えのある反応を得ています。 開発・運用してみた感想 可読性と拡張性が高い 今回作成したCLIでは次のようなSQLを生成しています。高い拡張性や可読性が良いと改めて感じました。 INSTALL mysql; LOAD mysql; ATTACH '' AS mysqldb ( TYPE mysql); -- 環境変数から取ってくる CREATE TABLE users AS SELECT * FROM mysqldb.table_name; INSTALL bigquery FROM community; LOAD bigquery; ATTACH '' as bq ( TYPE bigquery); DROP TABLE IF EXISTS bq.lake__system_name.table_name; CREATE TABLE bq.lake__system_name.table_name AS SELECT * FROM table_name; DROP TABLE table_name; 拡張についても、次のCore Extensionsの他にCommunity Extensionsがあります。DB以外にもSpreadSheetなど幅広いツールが対応しているので、興味を持った方は確認してみると良いと思います。 duckdb.org とはいえまだまだ新興のソフトウェア DuckDBは新興のソフトウェアということもあり、普通にバグがあったりします。例えば次のIssueは、私がDuckDBのMySQLのプラグインのATTACH句に存在したバグを報告したものです。(既に解決済みです) github.com また、拡張によっては、サポートしているOSが限られていることがあります。私が作成した時期では、BigQuery拡張でarm64 linuxがサポートされておらず、Fargateをamd64で立てていました。なお、こちらも現在は対応しているようです。 github.com まとめ 今回の取り組みで、我々の テーブル同期システムはより高速、堅牢になりました。 さらに、ユーザーインターフェースが洗練され、 チームメンバーの利用しやすいソフトウェアとなりました。 データソリューションチームでは一緒に事業部横断データ基盤を作る仲間を募集しています。気になる方は是非次のフォームからカジュアル面談に応募してみてください!! herp.careers

動画

該当するコンテンツが見つかりませんでした

書籍

該当するコンテンツが見つかりませんでした