TECH PLAY

Embulk

イベント

該当するコンテンツが見つかりませんでした

マガジン

技術ブログ

この記事は「 ファインディエンジニア #1 Advent Calendar 2025 」の24日目の記事です。 沢山のアドベントカレンダー記事が執筆されていますので、年末のお供に是非読んでみてください。 adventar.org はじめに ソフトウェアエンジニアの 土屋(@shunsock) です。私の所属するデータソリューションチームでは、ファインディ全体のデータ活用を推進するためのデータ基盤を構築しています。 今回、我々はデータ基盤のRDSとBigQueryのテーブル同期システム (EL Pipeline) のリプレースを行い、 DuckDBを本番導入 しました。本稿では、活用に至った経緯と実際に組みこむにあたる課題、および成果を紹介します。 はじめに ファインディにおけるテーブル同期システムの立ち位置 リプレイスの背景 補足 技術選定 Datastream DuckDB Datastream, DuckDB両採用の理由 システム設計 概要 CLIを挟む理由 GitHub Actionsからの起動にした理由 複数のRDSを転送する 開発運用と成果 開発・運用してみた感想 可読性と拡張性が高い とはいえまだまだ新興のソフトウェア まとめ ファインディにおけるテーブル同期システムの立ち位置 ファインディでは、ウェブアプリケーションをAWS上のECSとRDS、データ基盤をGoogle CloudのBigQueryで作成しています。 このような構成を取っているため、AWSのRDSとGoogle CloudのBigQueryを同期してテーブルを最新にする必要があります。 次の図は、Findy Tools事業部における、現在のデータフローの概念図です。AWS上に存在するRDSのデータをBigQueryに転送していることが分かります。 リプレイスの背景 弊社では従来、OSSのEL(Extract Load) ツール  Embulk  をECSに載せて長期間運用していました。弊社で利用しているRDBMSやデータウェアハウスに対応している他、社内に知見を持った方が在籍しているためです。 しかし、近年では、 Embulkのエコシステムのレガシー化や 長期的なメンテナが不足 が課題 となっています。特に、 将来のメンテナンスが不透明な点は、セキュリティインシデントに繋がりかねない ため危惧していました。 また、 Embulkの起動の遅さも課題 にしていました。我々はBigQueryプラグインなどを利用していたため、JVM上でさらにJRuby VMを立ちあげます。このような構成は テーブル同期の遅さに繋がり、ECSの課金額を増やす要因 となっていました。 このように、システムを堅牢にすることと処理スピード向上による料金のコストダウンが今回のプロジェクトの目的でした。 補足 Embulkのメンテナーの方も 「オープンソース・プロジェクトのたたみ方」 というブログ記事で脆弱性について次のように述べています。 おそらくいくつかの攻撃は既に成功していて、私たちのソフトウェア・サプライチェーンには、悪意のあるコードがとっくに入り込んでいる、と認識しておくべきでしょう。 技術選定 Datastream, Spark, その他 ELTツールなど、複数の移行先候補がありました。その中で、データ規模に応じて次の2つから選定することにしました。 Datastream: ニアリアルタイムでの更新が欲しい場合や大規模データの場合 DuckDB: 小規模データの場合 Datastream Datastream は Google Cloudが提供するサーバーレスのCDC (Change Data Capture), Replicationツールです。 CDCは、あるソースのシステムを監視し、そのシステムに対する操作をニアリアルタイムで、ターゲットとなるシステムに反映する仕組みのことです。これによりAWSのRDSに対する変更を即座にBigQueryに反映可能です。 DuckDB DuckDB は高速なアナリティカルデータベースです。s3などのストレージサービスに出力されたログ分析やファイルフォーマットの変換、wasmによるフロントエンドでの活用など広い用途で活用されています。 接続先や出力フォーマットが非常に豊富な他、C++製のマルチスレッドランタイムにより、高速に動作する点が魅力です。 次の写真はDuckDBのPoC時に行なったベンチマークです。小さなテーブルで転送を試したところ、 1.5倍程度の高速でした 。 ソフトウェア名称 平均 標準偏差 最速 最遅 Embulk 253秒 8秒 242秒 261秒 DuckBD 176秒 30秒 137秒 209秒 補足: 実際にパフォーマンステストを行ったときの様子 Datastream, DuckDB両採用の理由 今回のリプレイスでは、コスト最適化を軸に Datastream と DuckDB の2種類のアプローチを使い分ける構成を採用しました。 DatastreamはフルマネージドでサーバーレスなCDCサービスと強力です。一方で、ニアリアルタイム性が不要な小規模データに対しては機能過多となり、費用面でも割高になります。そこで、リアルタイム性を求めない領域では、より軽量でシンプルに扱えるDuckDBを使って同期を行う方針を取りました。 本記事の以降では、上記のうち、DuckDBによってどのようにテーブル同期システムを構築したか、開発運用で見えた知見を説明します。 システム設計 概要 次の画像は我々のDuckDBによるテーブル同期システムの概念図です。 次のように各種ソフトウェアが起動します。 GitHub Actionsの on_schedule でワークフローが起動 ワークフローがECS Fartate Taskを起動 Fargate Taskがコンテナランタイムを起動 コンテナランタイムの中でCLIアプリケーションが起動 CLIアプリケーションが引数と設定ファイルからSQLを生成 CLIアプリケーションがDuckDBでSQLを実行 CLIを挟む理由 DuckDBを直接起動しない理由は、1回の実行で1テーブルずつ送信できるようにするためと、SQLを直接書かずに設定ファイルをインターフェースにするためです。 実際のユーザーの入力インターフェースは次のようなYAMLです。 dataset_id : lake... table_name : table_name select_statement : "hoge, fuga, ..." GitHub Actionsからの起動にした理由 元々のワークフローはEventBridge Schedulerだったのですが、システム障害時にEventBridgeのcronを変更するなど運用負荷が重い状態でした。 DispatcherをGitHub Actionsにすることでボタン操作だけで検証可能にしました。 また、1テーブルずつの送信にしたので、ステージング環境での動作検証も簡単かつ軽量です。ユーザーは次のようなWorkflow Dispatchを起動するだけで動作検証が完了します。 複数のRDSを転送する 現在のFindy Tools事業部のワークフローを見ると分かる通り、複数のRDSを転送する必要がありました。そこで開発用スクリプトを汎用化して動的なビルドやawsコマンドの発火をしています。 開発運用と成果 開発は、私1人で1か月弱でしました。最初の1プロジェクトこそ時間がかかったものの、モノレポ構成にしたおかげで 従来1か月かかった新規データソースの追加が1週間程度になりました。 処理速度については、直列稼動から並列稼動へ変更となったため単純な比較は難しいのですが、 1テーブルあたり約30秒から約10秒に短縮 できました。 すでに他のメンバーからもプルリクエストが届いており、社内でも手応えのある反応を得ています。 開発・運用してみた感想 可読性と拡張性が高い 今回作成したCLIでは次のようなSQLを生成しています。高い拡張性や可読性が良いと改めて感じました。 INSTALL mysql; LOAD mysql; ATTACH '' AS mysqldb ( TYPE mysql); -- 環境変数から取ってくる CREATE TABLE users AS SELECT * FROM mysqldb.table_name; INSTALL bigquery FROM community; LOAD bigquery; ATTACH '' as bq ( TYPE bigquery); DROP TABLE IF EXISTS bq.lake__system_name.table_name; CREATE TABLE bq.lake__system_name.table_name AS SELECT * FROM table_name; DROP TABLE table_name; 拡張についても、次のCore Extensionsの他にCommunity Extensionsがあります。DB以外にもSpreadSheetなど幅広いツールが対応しているので、興味を持った方は確認してみると良いと思います。 duckdb.org とはいえまだまだ新興のソフトウェア DuckDBは新興のソフトウェアということもあり、普通にバグがあったりします。例えば次のIssueは、私がDuckDBのMySQLのプラグインのATTACH句に存在したバグを報告したものです。(既に解決済みです) github.com また、拡張によっては、サポートしているOSが限られていることがあります。私が作成した時期では、BigQuery拡張でarm64 linuxがサポートされておらず、Fargateをamd64で立てていました。なお、こちらも現在は対応しているようです。 github.com まとめ 今回の取り組みで、我々の テーブル同期システムはより高速、堅牢になりました。 さらに、ユーザーインターフェースが洗練され、 チームメンバーの利用しやすいソフトウェアとなりました。 データソリューションチームでは一緒に事業部横断データ基盤を作る仲間を募集しています。気になる方は是非次のフォームからカジュアル面談に応募してみてください!! herp.careers
はじめに こんにちは。商品基盤部・商品基盤2ブロックの 小原 です。私が所属するブロックではお気に入り機能のマイクロサービスを担当しています。 ZOZOTOWNではさらなる成長に向けて、さまざまな リプレイスプロジェクト が進行中です。本記事では、その中でもお気に入り機能のリプレイスについて紹介します。SQL ServerからAurora MySQLへ数十億レコードをゼロダウンタイムで移行するために設計したデュアルデータベース戦略を解説します。 こんな方に読んでもらいたい 段階的なマイクロサービス移行戦略を策定する担当者 ゼロダウンタイム移行の手法を探すアーキテクト Spring Bootでマルチ DataSource を実装する開発者 数十億レコード規模の移行戦略に関心があるデータエンジニア オンプレミスからAWS移行でダウンタイム最小化に課題を抱えるチーム なぜデュアルデータベース構成を採用したのか 背景:オンプレミスからクラウドへの段階的移行 既存システムの状況 ZOZOTOWNでは各機能のマイクロサービス化とAWS移行を段階的に進めており、既に多くの機能がクラウド化されています。お気に入り機能はこの移行プロジェクトの対象の1つで、移行前はオンプレミス環境のSQL Serverで運用されていました。モバイルアプリ(iOS、Android)やウェブアプリ(スマートフォン、PC)からオンプレミスバックエンドへアクセスし、お気に入り機能を提供してきました。 既存のオンプレミスシステムを分析し、お気に入り機能のマイクロサービス化を検討しました。検討の過程で、システム構成に起因する課題が明らかになりました。お気に入り機能は複数の画面や機能から呼び出されています。複数箇所からの呼び出しに対応するため、クライアントアプリケーションの接続先切り替えは機能単位で段階的に進める必要があります。 切り替えを終えた機能は新しいお気に入りマイクロサービスを呼び出します。未切り替えの部分は従来のオンプレミスバックエンドを利用し続けます。 オンプレミスバックエンドとマイクロサービスの双方に並行してリクエストが送られます。同じお気に入りデータに対して両経路から読み書きが行われ、データベース分離時の整合性確保が大きな課題となりました。 お気に入り機能の概要 ZOZOTOWNではユーザーの購入体験を高めるためにお気に入り機能を提供しています。 商品お気に入り:商品をリストに登録し、後から一覧表示や購入へつなげる ブランドお気に入り:ブランドを登録し、商品一覧や新着情報を受け取る ショップお気に入り:ショップを登録し、ショップの取扱商品を取得する お気に入りデータの規模 2025年9月時点のデータ量は次のとおりで、今も増加しています。 商品お気に入り :数十億レコード(メインデータ) ブランドお気に入り :数億レコード ショップお気に入り :数千万レコード 段階的な移行アプローチ 3つのフェーズで段階的に移行を進め、現在はフェーズ1で本番稼働しています。データベース間の同期には Embulk を利用し、SQL ServerからAurora MySQLへの安定的な差分同期を実現します。 フェーズ1(本番稼働中) - SQL Server単体運用 フェーズ2(予定) - デュアルデータベース運用 フェーズ3(目標) - Aurora MySQL単体運用 デュアルデータベース戦略の採用 数十億レコードの無停止移行を実現するため、さまざまな移行方式を検討しました。検討の結果、デュアルデータベース戦略を採用し、ゼロダウンタイムでの安全な移行を実現します。 データ整合性の課題 同じユーザーのデータが2つのシステムに分散 リアルタイムでの同期が困難 機能によってお気に入り状態が異なって見える 移行方式の比較検討 無停止移行を実現するため、複数のアプローチを検討しました。各方式のメリット・デメリットを評価した結果は次のとおりです。 ビッグバン移行 → 不採用 数十時間のダウンタイムが発生する データベースレプリケーション → 不採用 オンプレミスSQL Server→Aurora MySQL間の直接レプリケーションが困難 コストも高額 ETL/CDC同期(定期的なデータ抽出・変換・ロード) → 一部採用 分単位の遅延があり、リアルタイム要件を満たさない デュアルデータベース → ✓ 採用 完全なゼロダウンタイムを実現できる 選択した段階的な移行戦略 デュアルデータベース戦略の概要を次の図に示します。 ゼロダウンタイムを実現するポイント SQL Serverが常にメインデータベースとして稼働 Aurora MySQLは段階的に同期状態を構築 設定変更のみでフェーズ切り替えが可能 デュアルデータベース戦略のリスクとトレードオフ デュアルデータベース構成にはリスクもあります。実際に直面した課題と対策を以下にまとめました。 デュアルデータベース戦略が向かないケースも存在します。 小規模データ(数百万レコード未満)での移行 ダウンタイムが許容できるシステム 運用チームのリソースが限定的な場合 短期間で移行完了が求められる場合 運用面のリスクと対策を整理した表です。 リスク項目 具体的な課題 対策・軽減策 運用負荷の増加 2つのデータベースの監視・メンテナンス・チューニングが必要 監視基盤の統一、SREチーム体制強化 障害時の複雑化 どちらのデータベースで障害が発生したか、影響範囲の特定が困難 詳細なログ設計、障害対応の手順書整備 データ不整合リスク 非同期書き込みによる遅延や失敗時の不整合データ発生 定期的な整合性チェック、補正バッチ処理 技術面の制約と対応方針を整理した表です。 制約項目 影響 対応方針 Spring Events(アプリケーション内イベント機構)の信頼性 プロセス停止時のイベントロスト Embulkによる定期補正で補完 メモリ使用量の増加 2つのコネクションプールとイベント処理でメモリを消費 JVMチューニング、適切なプール設定 トランザクション複雑化 2つのデータベース間で分散トランザクションを扱う必要 結果整合性(eventual consistency)で妥協 ビジネス面で考慮した点は次のとおりです。 移行期間の長期化:デュアル運用期間が数か月に及ぶ可能性 コスト増:Aurora MySQLとSQL Serverの並行運用コスト チーム学習コスト:新技術習得のための時間投資 DataSource とトランザクション制御による段階的な移行戦略 数十億レコードを安全に移行するため、プロパティでデータベースを段階的に切り替える仕組みを開発しました。 DataSource 設定とプロパティ制御 環境変数やプロパティファイルの設定値を変更するだけでフェーズを切り替える仕組みを構築しました。以下はAurora MySQLの接続設定例です。 # application.yml app : config : database : dual # mssql → dual → mysql の段階的変更 datasources : writer : jdbc-url : jdbc:aws-wrapper:mysql://writer-host:3306/favorite-db read-only : false aws : wrapper-plugins : failover wrapper-dialect : aurora-mysql reader : jdbc-url : jdbc:aws-wrapper:mysql://reader-host:3306/favorite-db read-only : true mssql : etc : jdbc-url : jdbc:sqlserver://mssql-host:1433;database=zozoetc read-only : false @ConditionalOnProperty による Repository 切り替え Spring Boot 3 + Java 21をベースにした社内標準スタックでの実装例です。プロパティ値に応じた Repository をDIコンテナに注入します。 重要なポイントは2つあります。 @ConditionalOnProperty でフェーズごとに Repository を切り替えます。デュアルモード時には非同期Spring Eventsを活用します。 // フェーズ1: SQL Server単体モード(現在) @Repository @ConditionalOnDatabaseMssqlEnabled // app.config.database=mssql時に有効 public class FavoriteItemMssqlRepository implements FavoriteItemRepository { @Override public FavoriteItem save(SaveCommand command) { // フェーズ1はシンプル: SQL Serverにのみ保存 return sqlServerDao.insert(command); } } // フェーズ2: デュアルモード(最重要部分) @Repository @ConditionalOnDatabaseDualEnabled // app.config.database=dual時に有効 public class FavoriteItemDualRepository implements FavoriteItemRepository { @Override public FavoriteItem save(SaveCommand command) { // 1. メインのSQL Serverに同期的に保存(確実性優先) var result = sqlServerDao.insert(command); // 2. デュアル戦略のキーポイント: Aurora MySQLへの非同期反映 // Spring Eventsでの非同期データ同期の要所 applicationEventPublisher.publishEvent( new FavoriteItemSavedEvent(command, result.getId())); return result; } } // フェーズ3: Aurora MySQL単体モード(目標) @Repository @ConditionalOnDatabaseMySQLEnabled public class FavoriteItemMySqlRepository implements FavoriteItemRepository { @Override public FavoriteItem save(SaveCommand command) { // Aurora MySQLにのみデータを保存 return mysqlDao.insert(command); } } Repository実装では、以下の3つの設計を組み込みました。 条件付きBean登録: @ConditionalOnProperty で設定値に応じた実装を注入する デュアル戦略の本質:フェーズ2でSQL Serverへ同期書き込み後にSpring Eventsで非同期反映する 段階的移行:設定変更のみで3フェーズを切り替えられ、実装コードを変更しない AbstractRoutingDataSource によるReader/Writer自動振り分け Aurora MySQLの読み取り専用レプリカを使い、トランザクション種別に応じて接続先を自動切り替えします。 Reader/Writer分離により以下を実現します。 Writer(プライマリ)で書き込みを高速化し、一貫性を確保する Reader(リードレプリカ)で読み込みを分散し、プライマリの負荷を軽減する 数十億レコードでも高速な読み取りを維持する @Configuration public class DataSourceConfig { @ConditionalOnMySQLDataSourceRequired @Bean public DataSource mysqlDataSource() { final var routingDataSource = new AbstractRoutingDataSource() { @Override protected Object determineCurrentLookupKey() { // トランザクションの読み取り専用フラグで自動振り分け return TransactionSynchronizationManager.isCurrentTransactionReadOnly() ? RouteFor.READER // @Transactional(readOnly = true) → Reader : RouteFor.WRITER; // @Transactional → Writer } }; // Reader/Writer両方のDataSourceを設定 routingDataSource.setTargetDataSources(Map.of( RouteFor.READER, createReaderDataSource(), RouteFor.WRITER, createWriterDataSource() )); return routingDataSource; } public enum RouteFor { READER, // 読み取り専用レプリカ WRITER, // プライマリ(書き込み用) } } トランザクションのreadOnlyフラグで接続先を自動で振り分けます。 @Transactional(readOnly = true) → Readerに自動ルーティングされる @Transactional → Writerに自動ルーティングされる アプリケーションコードは接続先を意識せず、Spring Bootが制御する 将来的な削除を見据えたトランザクション制御の設計 Aurora MySQLでの単独運用を最終ゴールとし、将来的なコード削除を見据えた設計を採用しました。 Aurora MySQLの DataSource は @Bean で登録し、Spring Boot標準のトランザクション制御機構を使います。フェーズ3での単独運用を見据えてクリーンに実装しました。ZOZO社内の標準的な使い方に沿うため、長期運用しやすい構成です。 SQL Serverは将来削除する予定のため、Spring Bootの標準機構を使わず独立させました。削除時の影響を最小限に抑えられます。 項目 Aurora MySQL SQL Server DataSource のBean登録 @Bean で登録 Bean登録せず独立管理 アプリケーション側の記述 @Transactional (宣言的) @Transactional (宣言的) 内部のトランザクション実装 Spring Boot標準マネージャ AOPで TransactionTemplate 実行 トランザクション境界の管理 Spring Bootが自動管理 TransactionTemplate が制御 例外時のロールバック Spring Boot標準で自動 TransactionTemplate 内で処理 フェーズ3時点のコード 残す 削除 設計思想 長期運用を前提とした標準実装 削除を前提とした分離設計 SQL Serverの実装は削除を前提に設計しました。2つのコンポーネントで構成されています。 1. SQL Server設定クラス(Bean登録なし) Spring Bootの標準機構を使わず、独自の TransactionManager と TransactionTemplate を管理します。独立した管理により、フェーズ3での削除時に他のコンポーネントへの影響を最小限に抑えられます。 @Configuration @ConditionalOnMssqlDataSourceRequired // SQL Serverが必要なフェーズでのみ有効 public class MssqlDatabaseConfig { private final DataSource dataSource; private final PlatformTransactionManager transactionManager; public MssqlDatabaseConfig(ApplicationProperties properties) { // HikariCP設定でSQL Server接続(実際のプロダクション設定) this .dataSource = new TransactionAwareDataSourceProxy(createMssqlDataSource(properties)); // 重要: Spring Boot標準と分離した独自管理(@Primaryではない) this .transactionManager = new DataSourceTransactionManager(dataSource); } // 書き込み用TransactionTemplate(デフォルト分離レベル) @Bean public TransactionTemplate mssqlEtcTransactionTemplate() { var template = new TransactionTemplate(transactionManager); template.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT); return template; } // パフォーマンス最適化: 読み取り用はREAD_UNCOMMITTED @Bean public TransactionTemplate mssqlEtcTransactionTemplateForSelect() { var template = new TransactionTemplate(transactionManager); template.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED); return template; } } 2. AOPによる @Transactional の自動検知と適用 UseCase 層のメソッドに付与された @Transactional アノテーションをAOPで検知し、readOnlyの値に基づいて TransactionTemplate を選択して実行します。開発者は通常通り @Transactional を使うだけで、SQL Serverのトランザクションが自動制御されます。 @ConditionalOnMssqlDataSourceRequired @Aspect public class MssqlTransactionAop { private final TransactionTemplate mssqlEtcTransactionTemplate; private final TransactionTemplate mssqlEtcTransactionTemplateForSelect; // UseCase層パッケージ全体でTransactionTemplateを自動適用 @Around ( "execution(public * jp.zozo.favorite.api.usecase..*(..))" ) public Object transactionJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable { // @Transactional(readOnly = true)の有無でパフォーマンス最適化 var transactionTemplate = isReadOnly(joinPoint) ? mssqlEtcTransactionTemplateForSelect // READ_UNCOMMITTEDで高速化 : mssqlEtcTransactionTemplate; // DEFAULTで確実性 // TransactionTemplate.execute()でプログラマティック制御 return transactionTemplate.execute(status -> { try { // フェーズ3でSQL Server削除時に該当AOPも同時に削除予定 return joinPoint.proceed(); } catch (Throwable e) { throw new RuntimeException(e); } }); } // @Transactional(readOnly = true)の有無を検出するヘルパーメソッド private boolean isReadOnly(ProceedingJoinPoint joinPoint) { var method = ((MethodSignature) joinPoint.getSignature()).getMethod(); // メソッド上の@Transactionalアノテーションを取得し、readOnly属性をチェック return Optional.ofNullable(AnnotationUtils.findAnnotation(method, Transactional. class )) .map(Transactional::readOnly) // readOnly=trueならtrue、falseまたは未設定ならfalse .orElse( false ); // @Transactionalが無い場合はfalse } } UseCase 層での使用例 UseCase 層で @Transactional を使うと、プロパティ設定に応じてフェーズごとのデータベースを利用します。 @Service @RequiredArgsConstructor public class SaveFavoriteItemUseCase { private final FavoriteItemRepository favoriteItemRepository; @Transactional // 書き込み用Writerデータベースに自動ルーティング public FavoriteItemDTO handle(SaveCommand command) { // フェーズ1: SQL Serverのみ、フェーズ2: デュアルモードで自動切り替え return favoriteItemRepository .findByUserAndItem(command.getUserId(), command.getItemId()) .filter(FavoriteItem::isActive) .map(item -> item.update()) // 既存アイテムの更新 .map(favoriteItemRepository::save) // DBへ保存 .orElseGet(() -> favoriteItemRepository.save(command)) // 新規作成 .toDTO(); } } @Service @RequiredArgsConstructor public class GetFavoriteListUseCase { private final FavoriteItemRepository favoriteItemRepository; @Transactional (readOnly = true ) // 読み取り用Readerデータベースに自動ルーティング public FavoriteListDTO handle(GetListCommand command) { // フェーズ2以降: Aurora MySQL Readerでパフォーマンス最適化 return favoriteItemRepository.findFavoriteList(command); } } Spring Eventsによる非同期データベース同期 デュアルモードではSpring Eventsを活用します。SQL Serverへの書き込み成功後にAurora MySQLへ非同期で反映します。 @RequiredArgsConstructor @ConditionalOnDatabaseDualEnabled // デュアルモード時のみ有効 @Service @Transactional public class DataSyncEventListener { private final DataSyncRepository dataSyncRepository; @Async @EventListener public void handleSaveEvent(FavoriteItemSavedEvent event) { // Aurora MySQLへの非同期データ同期 dataSyncRepository.syncToMySQL(event); } @Async @EventListener public void handleUpdateEvent(FavoriteItemUpdatedEvent event) { // Aurora MySQLへの非同期データ同期 dataSyncRepository.syncToMySQL(event); } @Async @EventListener public void handleDeleteEvent(FavoriteItemDeletedEvent event) { // Aurora MySQLへの非同期データ同期 dataSyncRepository.syncToMySQL(event); } } // カスタムアノテーション @Target ({ElementType.TYPE, ElementType.METHOD}) @Retention (RetentionPolicy.RUNTIME) @Documented @ConditionalOnProperty (value = "app.config.database" , havingValue = "dual" ) public @interface ConditionalOnDatabaseDualEnabled {} クライアントアプリケーション移行期の課題と補完 フェーズ2ではマイクロサービスとオンプレミスバックエンドが混在します。オンプレミス経由でSQL Serverに直接書き込まれたデータは、Embulk同期で補完します。 データ整合性の検証機能 デュアル運用中のデータ品質を保つため、読み取り時の検証を自動化しました。 @Service @ConditionalOnDatabaseDualEnabled @Transactional (readOnly = true ) public class DatabaseVerificationEventListener { private final DatabaseVerificationRepository databaseVerificationRepository; @Async @EventListener public void listen(GetFavoriteItemListEvent event) { // 読み取り処理後に非同期で検証を実行 databaseVerificationRepository.verify(event); } } @Repository @ConditionalOnDatabaseDualEnabled public class DatabaseVerificationDomaRepository implements DatabaseVerificationRepository { private final DataDifferenceLogger dataDifferenceLogger; @Override public void verify(GetFavoriteItemListEvent event) { // Aurora MySQLから同一条件でデータを取得 var mysqlData = mysqlDao.selectByCommand(event.command()); // SQL Serverの結果とAurora MySQLの結果を比較 dataDifferenceLogger.difference( event.dto(), // SQL Serverから取得済みの結果 mysqlData, // Aurora MySQLから取得した結果 event.getUserId() ); } } @Component @ConditionalOnDatabaseDualEnabled public class DataDifferenceLogger { public <T> void difference(T mssqlData, T mysqlData, String userId) { if (Objects.equals(mssqlData, mysqlData)) { log.debug( "Data is same, userId = {}" , userId); } else { // 差分検出時はログとSentryへ送信 log.warn( "Data is different, userId = {}" , userId); Sentry.withScope(scope -> { scope.setTag( "userId" , userId); scope.setExtra( "mssqlData" , String.valueOf(mssqlData)); scope.setExtra( "mysqlData" , String.valueOf(mysqlData)); Sentry.captureException( new DataDifferenceException(userId)); }); } } } 両データベースの結果を比較し、差分はSentryで検知します。運用チームがすぐ対応できる仕組みを構築しました。 3つのフェーズによる段階的移行 デュアルデータベース構成への移行を3つのフェーズに分けて進めています。 フェーズ1: SQL Server単体運用(現在) 現在はフェーズ1で本番稼働中です。既存のSQL Serverを活用しながら、マイクロサービス化を先行して進めています。 フェーズ1ではSQL Serverのみでマイクロサービスを稼働し、Aurora MySQLへのデータ移行準備を並行で進めています。 フェーズ2: デュアルデータベース運用(予定) 両データベースを並行稼働させる移行期間で、データ整合性を保ちながら新システムへ切り替えます。 SQL Serverをメインとし、Spring Eventsで非同期にAurora MySQLへ複製します。オンプレミス経由の変更はEmbulk同期で補完します。 フェーズ3: Aurora MySQL単体運用(目標) 最終目標であるクラウドネイティブ環境への完全移行を目指します。 最終的にSQL Server関連コードを削除し、Aurora MySQLのみで運用します。 今回の学び 数十億レコードの無停止移行を実現するデュアルデータベース戦略について、設計思想から実装詳細まで解説しました。 今回の実装で工夫した点は4つです。 プロパティ切り替えによる3フェーズの段階的移行 @Transactional の統一APIで異なる内部実装を使い分け 将来のコード削除を見据えた意図的な設計分離 Spring EventsとEmbulk同期を組み合わせた整合性確保 現在はフェーズ1で安定稼働中です。フェーズ2・3に向けて本番環境とテスト環境の構築、デュアルデータベース運用のテスト手法の確立を進めています。 ZOZOでは、一緒にサービスを作り上げてくれる仲間を募集中です。興味がある方は以下の採用情報をご確認ください。 corp.zozo.com
はじめに こんにちは!Data Strategy teamでデータエンジニアをしているshota.imazekiです。 今回はBigQueryでのINFORMATION_SCHEMAを用いたBigQueryデータ監視というテーマでブログを書いていこうと思います。 BigQueryを利用していく上で「クエリが実行できなくなった」「データが古いまま更新されていない」「使われていないデータがある」などの様々な運用上の課題があるかと思います。それをINFORMATION_SCHEMAで使って簡単に解決していこうという話です。 BASEの分析基盤 まず前提としてBASEの分析基盤を簡単に紹介します。BASEではDWHにBigQueryを採用しています。BIツールとしてはLookerを導入していて、Dailyでデータが同期されるようになっています。ワークフロー管理にはAirflowを利用しており、データ連携部分にはEmbulkやS3 エクスポート機能などを使ってます。連携部分の詳細は こちら のテックブログに書いております。BigQueryはLookerから以外にもデータアナリストなどの分析者が直接クエリを実行することもあります。 BASEの分析基盤(簡略版) データ監視における課題 BASEにおけるデータ監視関連の課題は現状、以下の3点があります。 データが連携されていない、更新されないデータが連携され続けている 分析に使われていないデータが連携されている 重いクエリが実行されている 1. データが連携されていない、更新されないデータが連携され続けている データソース側やデータ連携部分で何かしらの障害が起きて、更新されるべきデータが更新されなくなっていることがあります。またこの逆で、プロダクト側での機能の廃止や外部サービスの利用停止などによって、既に更新されることのないデータを更新し続けているパターンもあります。同じデータで更新し続けている状況です。これらについてはユーザー側からの連絡や不定期の点検時に発見することが多く、対応が遅くなってしまうことが問題でした。 TABLE_STORAGE を使えば簡単にテーブル件数を確認することができます。 select distinct project_id, table_schema, -- データセット名 table_name, total_rows, -- テーブル件数 from [project_name].`region-asia-northeast1`.INFORMATION_SCHEMA.TABLE_STORAGE このテーブルを使ってDailyのデータ連携バッチ実行後などにテーブル件数を記録しておくことで、件数の変化を簡単に追うことができます。Window関数などを使って前日との件数比較を行い、同じ件数であればアラートを鳴らすということも可能になります。BASEではLookerのアラート機能を使ってSlackへ通知しています。 2. 分析に使われていないデータが連携されている BASEでは基本的にはELT形式で一度、BigQueryにデータを連携してからデータマートなどへの加工を行っています。その中には特定プロジェクトのために一時的に作られて、PJ自体は終了したがそのまま放置されているデータマートなどもあります(それによって不要になった元データもあるでしょう)。不要になったデータマートやその加工処理はコストパフォーマンスの観点から削除をした方がいいと考えていますが、こちらの発見も遅れることが多かったため、もっと早く簡単に検知できないかと考えました。 JOBS_BY_PROJECT (ORGANIZATIONなどもある), TABLES を使うことで使われてないテーブルを洗い出すことができます。 JOBYS_BY_PROJECTを使って実行されているクエリから、どのテーブルが利用されているかを見ることができます。クエリの実行履歴については一定期間以内(180日程度)までしか保持されないのでもっと遡りたい時は実行履歴を保存しておくと良いでしょう。 select j.user_email, j.query, t.project_id, t.dataset_id, t.table_id, ROUND (j.total_bytes_processed / ( 1024 * 1024 * 1024 ), 2 ) bytes_processed_in_gb, -- 処理されるGB数も分かる from [project_name].`region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_PROJECT j cross join unnest(j.referenced_tables) t where -- 成功したREAD文のみを取り出す j.job_type = ' QUERY ' and j.statement_type = ' SELECT ' and j.state = ' DONE ' and j.error_result is null その後、TABLESから全テーブルの一覧を取り出し、差分を見ることで使われていないテーブルを洗い出すことができます。 select distinct table_catalog, -- プロジェクト名 table_schema, -- データセット名 table_name from [project_name].`region-asia-northeast1`.INFORMATION_SCHEMA.TABLES 3. 重いクエリが実行されている 前提としてBASEでは以下のような制限をBigQueryに設定しています。 GCPプロジェクト単位でクエリ走査量の制限: Query usage per day ユーザー単位でのクエリ走査量の制限: Query usage per day per user またLookerからの1クエリ単位での走査量の制限も入れています。 これらの制限によって基本的には重いクエリが実行されてもコストが莫大に膨れ上がったり、コスト制限によって利用できなくなったといった問題は防げています。しかし、1ユーザー単位で見た時には数回重いクエリを実行しただけでBigQueryが利用できなくなってしまうこともあり、コスト的にも分析体験としても好ましくありません。 したがって重いクエリが実行された場合にそれを通知できるようにし、クエリの改善もしくはデータマートの構築などを提案できるようにしました。 2同様、 JOBS_BY_PROJECT (ORGANIZATIONなどもある)を利用することで重いクエリを洗い出すことができます。 select j.user_email, j.query, t.project_id, t.dataset_id, t.table_id, ROUND (j.total_bytes_processed / ( 1024 * 1024 * 1024 ), 2 ) bytes_processed_in_gb, -- 処理されるGB数も分かる from [project_name].`region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_PROJECT j cross join unnest(j.referenced_tables) t where -- 例: 100GB以上の走査量を取り出すクエリ (j.total_bytes_processed / ( 1024 * 1024 * 1024 ) > 100 Lookerのアラート機能 を用いることで、定期的に(数分単位や1時間単位など)重いクエリが実行されたかを確認し、Slackに通知することにしています。 またLookerからはサービスアカウントを用いてクエリを実行する都合上、誰か1人がLookerからQuery usage per day per userの上限に引っかかるくらい使ってしまうと他のLookerユーザーにも影響が出てしまいます。その検知も簡単に行えるようにしました。上記の JOBS_BY_PROJECT テーブルなどを日単位でtotal_bytes_processedをsumしたりすることで設定した閾値を超えた場合に通知することができるようになります。 おわりに INFORMATION_SCHEMAを用いることで分析基盤の運用がより便利になる事例を紹介してきました。正直、Data observabilityツール( elementary など)を用いた方がよりDWHの改善などには繋がると思いますが、気軽に始めるならINFORMATION_SCHEMAを使った方が良いかと考えています。BASEでもelementaryの導入などは進めていきたいとは思っていて、このような分析基盤の改善を一緒に行っていくメンバーを募集しています。ご興味のある方は気軽にご応募ください! A-1.Tech_データエンジニア / BASE株式会社 明日のBASEアドベントカレンダーはosashimiさんの記事です。お楽しみに!

動画

該当するコンテンツが見つかりませんでした

書籍

該当するコンテンツが見つかりませんでした