TECH PLAY

MySQL

イベント

マガジン

技術ブログ

G-gen の杉村です。2026年3月に発表された、Google Cloud や Google Workspace のイチオシアップデートをまとめてご紹介します。記載は全て、記事公開当時のものですのでご留意ください。 はじめに Google Cloud のアップデート Gemini 3.1 Flash-Lite が Preview 公開 IAM で Service account principal sets が使えるようになった BigQuery の Conversational Analytics がアップデート gemini-embedding-2-preview が登場 Cloud Storage に Rapid bucket が登場(GA) Cloud Run への IAP 認証の直接設定が一般公開(GA) BigQuery でグローバルデフォルトロケーションが設定可能に AlloyDB のエンハンストバックアップが一般公開(GA) BigQuery data preparation が Cloud Storage と Google ドライブに対応 Cloud NGFW(Enterprise tier)に URL フィルタリングが登場 Imagen モデルが廃止へ 音楽生成モデル Lyria 3 が Vertex AI 経由で使用可能に(Public Preview) Vertex AI Search の回答モデルに gemini-3.1-pro、gemini-3-flash が登場 AlloyDB と Cloud SQL で Conversational analytics が Preview 公開 TPU7x が提供開始 Google Workspace のアップデート Google Workspace の Gemini アプリの共有(公開リンク)が可能に Gemini in Chrome の対応地域が拡張(日本は未対応) NotebookLM にスライドの PPTX 形式エクスポートなどの機能追加 Google Meet の会議の参加者承認時、疑わしいリクエストが分けて表示される Gemini アプリでこれまでより長く3分間の音楽の生成が可能に Google Workspace に「ゲストアカウント」機能が登場 Google ドライブのランサムウェア検出が Beta 版 → 一般公開(GA) はじめに 当記事では、毎月の Google Cloud(旧称 GCP)や Google Workspace(旧称 GSuite)のアップデートのうち、特に重要なものをまとめます。 また当記事は、Google Cloud に関するある程度の知識を前提に記載されています。前提知識を得るには、ぜひ以下の記事もご参照ください。 blog.g-gen.co.jp リンク先の公式ガイドは、英語版で表示しないと最新情報が反映されていない場合がありますためご注意ください。 Google Cloud のアップデート Gemini 3.1 Flash-Lite が Preview 公開 Gemini 3.1 Flash-Lite (2026-03-03) Gemini 3.1 Flash-Lite が Preview 公開。 「Gemini 2.5 Flash に相当する回答品質」を目指している。Vertex AI や Google AI Studio から利用可能。 IAM で Service account principal sets が使えるようになった Service accounts (2026-03-03) IAM で Service account principal sets が使えるようになった。あるプロジェクト/フォルダ/組織配下の全サービスアカウント or 全サービスエージェントを指すセット。 あるプロジェクト内の全サービスアカウントは principalSet://cloudresourcemanager.googleapis.com/projects/123456789012/type/ServiceAccount 、あるフォルダ内の全サービスエージェントは principalSet://cloudresourcemanager.googleapis.com/folders/123456789012/type/ServiceAgent のように表現される。 IAM 許可ポリシーや拒否ポリシーに、プリンシパルとして書き込める。 BigQuery の Conversational Analytics がアップデート BigQuery release notes - March 09, 2026 (2026-03-09) BigQuery の Conversational Analytics がアップデート。 BigQuery Studio の SQL 実行結果から会話をスタートできる AI.FORECAST、AI.DETECT_ANOMALIES、AI.GENERATE に対応 ObjectRef 対応で非構造化データが扱える パーティション列で効率的にクエリをする ジョブ履歴に {‘ca-bq-job’: ‘true’} が付く 次の質問のサジェストが出る gemini-embedding-2-preview が登場 Gemini Embedding 2 (2026-03-10) gemini-embedding-2-preview が登場。 マルチモーダルなエンベディングモデルで、ドキュメントの OCR や動画から音声トラックを抽出することも可能。 Cloud Storage に Rapid bucket が登場(GA) Rapid Bucket (2026-03-10) Cloud Storage に Rapid bucket が登場(GA)。 VM と同じゾーンにバケットを配置することで低レイテンシで高スループットを実現。AI/MLやデータ分析、ロギング、ストリーミング記録などに使用することを想定。 Cloud Run への IAP 認証の直接設定が一般公開(GA) Configure IAP for Cloud Run (2026-03-13) Cloud Run にロードバランサーなしで IAP 認証を設定する機能が Preview → 一般公開(GA)。 GA にあわせて組織外ユーザーの認証も可能になった。これまでは組織内ユーザーに限定されていた。 BigQuery でグローバルデフォルトロケーションが設定可能に Specify global settings (2026-03-16) BigQuery でグローバルデフォルトロケーションが設定可能になった。 組織レベルまたはプロジェクトレベルで適用できる。これまで場所の指定がない場合はシステム側の挙動に依存していたが、このアップデートにより、意図しないリージョンでのリソース作成や、場所指定漏れによるエラーを未然に防ぐことができる。 AlloyDB のエンハンストバックアップが一般公開(GA) Manage enhanced backups (2026-03-16) AlloyDB のエンハンストバックアップ(拡張バックアップ)が一般公開(GA)。 Backup and DR サービスと連携。一元化されたバックアップ管理プロジェクトで管理および保存される。 変更不可のストレージ: バックアップは、Backup and DR で管理される Backup Vault に保存される 保持の適用: ポリシーにより、バックアップの誤った削除や悪意のある削除を防ぐ 高度なスケジュール設定: バックアップの頻度と保持ルールを高度にカスタマイズできる 一元管理: AlloyDB、Cloud SQL、Compute Engine などの複数のGoogle Cloud ワークロードにわたって統一されたモニタリングとレポート BigQuery data preparation が Cloud Storage と Google ドライブに対応 BigQuery release notes - March 23, 2026 (2026-03-23) BigQuery の「データ準備(Data preparation)」が、Cloud Storage と Google ドライブからのデータ取り込みに対応。 Data preparation とは、AI によるデータエンジニアリング支援ツール。ファイルをテーブルに取り込んで変換等する作業などが簡単に実装できる。 Cloud NGFW(Enterprise tier)に URL フィルタリングが登場 URL filtering service overview (2026-03-24) Cloud NGFW(Enterprise tier)に URL フィルタリングが登場。 HTTP(S)トラフィックのドメイン / SNI 情報に基づいて URL フィルタリングを行う。 Cloud NGFW の Enterprise tier では、VPC にエンドポイントを構築し、パケットを横から検査する。エンドポイントの存在した時間に応じて料金が発生する。 参考 : Cloud Next Generation Firewall pricing Imagen モデルが廃止へ Vertex AI release notes - March 24, 2026 (2026-03-24) 画像生成モデル Imagen モデルが廃止へ。 推奨される後継モデルは gemini-2.5-flash-image (いわゆる Nano Banana 2)。以下のようなモデルが廃止される(一部抜粋)。 imagegeneration@006 imagetext@001 imagen-3.0-capability-002 imagen-3.0-fast-generate-001 imagen-3.0-generate-002 imagen-4.0-fast-generate-001 imagen-4.0-generate-001 imagen-4.0-ultra-generate-001 音楽生成モデル Lyria 3 が Vertex AI 経由で使用可能に(Public Preview) Lyria 3 (2026-03-25) 音楽生成モデル Lyria 3 が Vertex AI 経由で使用可能になった(Public Preview)。インプットにはテキストと画像が使える。 lyria-3-pro-preview : 184秒の音楽生成 lyria-3-clip-preview : 30秒の音楽生成 Vertex AI Search の回答モデルに gemini-3.1-pro、gemini-3-flash が登場 Answer generation model versions and lifecycle (2026-03-26) Vertex AI Search の回答モデルに新しいモデルが登場 gemini-3.1-pro-preview gemini-3-flash-preview なおこれまで使えていた Gemini 3 Pro Preview は使用不可になる。 AlloyDB と Cloud SQL で Conversational analytics が Preview 公開 Conversational analytics for Cloud SQL for PostgreSQL overview (2026-03-26) AlloyDB for PostgreSQL、Cloud SQL for PostgreSQL / MySQL で、Conversational analytics(会話型分析)が Preview 公開。 Google Cloud コンソールから自然言語を使って、テーブルに対するクエリを発行できる。BigQuery に続いて、運用データベースでも自然言語による DB へのクエリが可能になった。 TPU7x が提供開始 TPU7x (Ironwood) (2026-03-31) Google Cloud で新しい TPU「TPU7x」が提供開始。Google Kubernetes Engine(GKE)で使用可能。第7世代 TPU である Ironwood ファミリーであり、大規模 AI トレーニング・推論向け。 Google Workspace のアップデート Google Workspace の Gemini アプリの共有(公開リンク)が可能に Workspace admins can allow Gemini app conversation sharing for their organizations (2026-03-04) Google Workspace の Gemini アプリの共有(公開リンク)が可能になった。これまでは個人版のみ可能だった。 公開リンクは Google アカウントがなくても閲覧可。デフォルトで無効で、管理者設定で有効化の必要あり。許可するかどうかはセキュリティ上の考慮が必要。 Gemini in Chrome の対応地域が拡張(日本は未対応) Gemini in Chrome expands to more countries and languages, including Canada, New Zealand, and India (2026-03-13) Gemini in Chrome が従来の米国に加えて、カナダ、ニュージーランド、インドに対応。 Gemini in Chrome とは、開いているタブのコンテキストに基づいて記事の要約、解説、情報を検索したり、コンテンツの生成(テキスト・画像)、また Gemini Live などを使える機能のこと。 日本のユーザーは未対応なため注意。 NotebookLM にスライドの PPTX 形式エクスポートなどの機能追加 New ways to customize and interact with your content in NotebookLM (2026-03-20) NotebookLM に機能追加。 インフォグラフィックのスタイル指定 パワポ形式でのスライドのエクスポート スライド微調整 Cinematic Video Overviews (ただし英語のみ) ...など。 対象の Google Workspace エディションは全エディションだが、Cinematic Video Overviews のみ Business Standard 以上など。 Google Meet の会議の参加者承認時、疑わしいリクエストが分けて表示される Safeguarded guest admit flow in Google Meet (2026-03-24) Google Meetの会議の参加者承認にアップデート。信頼性が怪しい入室リクエストについては、通常のリクエストとは分けて表示されるようになる。 Gemini アプリでこれまでより長く3分間の音楽の生成が可能に Create longer musical tracks in the Gemini app with Lyria 3 Pro (2026-03-25) Gemini アプリで、新モデル Lyria 3 Pro により、3分間の音楽生成が可能になった。従来は30秒。 Google Workspace の Business Standard 以上のエディションで、2026年3月25日から数日かけてロールアウト。 Google Workspace に「ゲストアカウント」機能が登場 Introducing guest accounts: Collaborate securely and communicate with non-Workspace users in Google Chat (2026-03-30) Google Workspace に「ゲストアカウント」機能が登場。 外部の人を Google Chat 経由で招待すると、一意の ID が割り当てられたゲストアカウントが発行される。 Google ドライブ上のファイルの共同作業が可能(ファイル新規作成は不可)。Workspace Guests OU に配置され各種ポリシーを適用可能。 有償ライセンス数 × 5個のゲストアカウントが作成可能。 Google ドライブのランサムウェア検出が Beta 版 → 一般公開(GA) Ransomware detection and file restoration for Google Drive now generally available (2026-03-30) Google ドライブでランサムウェア検出&ファイル修復機能が Beta 版 → 一般公開(GA)。 デスクトップ版 Google ドライブを使っている場合に、ランサムウェアが検出されると、ファイル同期が停止され、管理者や本人にアラート通知。 万一、ドライブ上のファイルが暗号化されても一括復元が可能。 杉村 勇馬 (記事一覧) 執行役員 CTO 元警察官という経歴を持つ IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 認定資格および Google Cloud 認定資格はすべて取得。X(旧 Twitter)では Google Cloud や Google Workspace のアップデート情報をつぶやいています。 Follow @y_sugi_it
はじめに こんにちは、KINTOテクノロジーズのFACTORY EC開発グループでバックエンドエンジニアをやっている、うえはら( @penpen_77777 )です。 今回はWebサービスを決められたレギュレーションの中で限界まで高速化を図るチューニングバトル「ISUCON」で得た知識を活用して、FACTORYでマスタデータ反映に1時間30分かかっていた処理をたった5分で終わらせるようにした方法についてご紹介します。 「ISUCON」は、LINEヤフー株式会社の商標または登録商標です。 ISUCON is a trademark or registered trademark of LY Corporation. https://isucon.net 今回の課題 FACTORYでは商品や車種などのマスタデータをExcelファイルに取りまとめ、 そのExcelファイルをもとに本番環境のDBにデータを反映しています(=マスタ反映)。 このマスタ反映に90分かかっており、マスタ運用作業のボトルネックになっていました。 例えば本番環境への反映の前に検証環境でマスタデータに問題ないかを確認しているのですが、 データの誤りに気づいて修正してもマスタ反映に90分かかるため、データが正しく直せたかどうかすぐに確認できない状況でした。 そこで、マスタ反映を高速化することで運用作業の効率化を図ることにしました。 マスタデータ反映 マスタ反映は、Excelで管理されているマスタデータを元に、最終的にマスタ反映コンテナがDBに書き込むという流れになっています。 上記の流れを図に示します。 図中では以下のような流れでマスタ反映が進みます。 マスタ運営担当者が、原本となるExcelファイルに車種や商品情報を入力する 出来上がったExcelファイルをマスタ管理ツールにアップロードする マスタ管理ツールがバリデーションをかけ、問題があれば担当者に通知する Excelがアップロードされると裏でLambda関数が実行され、ExcelファイルからCSVファイルに変換される DBに反映したい段階で、マスタデータをFACTORY本体に連携するため、CSVをレプリケーションバケットに保存する レプリケーションバケットにファイルが保存されるとFACTORY本体でステートマシンが起動し、マスタ反映コンテナを起動する マスタ反映コンテナがCSVを読み取ってSQLを組み立て、DBの各テーブルにレコードを読み書きする 今回高速化の対象としたのは、7のマスタ反映コンテナの処理です。 パフォーマンスチューニングをどのように進めたか追体験する 今回のマスタ反映に関するパフォーマンス問題についてどのように解決したかサンプルコードで見ていきましょう。 実際のマスタ反映処理はKotlinで記述されていますが、サンプルコードの方では筆者が慣れているGoを使います。 また、使用するマスタデータはFACTORYの実際に使われているデータではありません。 ですが、似た構造のマスタデータを使うので、実際に筆者が行ったパフォーマンスチューニングと同じ方法で高速化できます。 もしよろしければ皆さんも手を動かしながら試してみてください。 入力 ECサイトで管理している商品データを反映したいと考えてみましょう。 表では省略していますが、全部で50万件程度のデータとなります product_code 商品を一意に識別するコード product_name 商品の表示名 category_code 商品が属するカテゴリのコード supplier_code 仕入先コード status_code 商品の販売状態 unit_price 単価(円) P1001 ボールペン 黒 CAT01 SUP01 active 150 P1002 ボールペン 赤 CAT01 SUP01 active 150 P1003 シャープペンシル CAT01 SUP02 discontinued 300 P2001 A4コピー用紙 500枚 CAT02 SUP03 active 450 P2002 A3コピー用紙 500枚 CAT02 SUP03 active 780 人間にとって分かりやすいように表で示しましたが、システムにはcsvの形で入力されます。 product_code,product_name,category_code,supplier_code,status_code,unit_price P1001,ボールペン 黒,CAT01,SUP01,active,150 P1002,ボールペン 赤,CAT01,SUP01,active,150 P1003,シャープペンシル,CAT01,SUP02,discontinued,300 P2001,A4コピー用紙 500枚,CAT02,SUP03,active,450 P2002,A3コピー用紙 500枚,CAT02,SUP03,active,780 出力 入力されたデータを以下のように product テーブルに入れることにします。 category_codeやsupplier_codeやstatus_codeは外部テーブルで保持される値となるため、idに変換した上で保存されます。 外部テーブルにはすでにレコードが反映されているとします。 product_id product_code product_name category_id supplier_id status_id unit_price 1 P1001 ボールペン 黒 1 1 1 150 2 P1002 ボールペン 赤 1 1 1 150 3 P1003 シャープペンシル 1 2 2 300 4 P2001 A4コピー用紙 500枚 2 3 1 450 5 P2002 A3コピー用紙 500枚 2 3 1 780 erDiagram Product { string product_id PK "商品ID" string product_code UK "商品コード" string product_name "商品名" string category_id FK "カテゴリID" string supplier_id FK "仕入先ID" string status_id FK "ステータスID" int unit_price "単価(円)" } Category { string category_id PK "カテゴリID" string category_code UK "カテゴリコード" string category_name "カテゴリ名" } Supplier { string supplier_id PK "仕入先ID" string supplier_code UK "仕入先コード" string supplier_name "仕入先名" } Status { string status_id PK "ステータスID" string status_code UK "ステータスコード" string status_name "ステータス名" } Category ||--o{ Product : "has" Supplier ||--o{ Product : "supplies" Status ||--o{ Product : "applies" 改善前のコード サンプルコードの全体構成を以下の図に示します。 ハンズオンをサクッとできるようにテストデータの準備等の必要な作業を行ったのち、本題のマスタ反映が実行されるようになっています。testcontainersでMySQLコンテナを起動しテスト用のCSVを生成した後、main.goがそのCSVを読み取ってDBにマスタ反映を行います。 今回使用するサンプルコードを以下に示します。以下の4つのコードを同じディレクトリに配置してください。 :::details main.go (改善対象のコード) package main import ( "context" "fmt" "log" "os" "time" _ "github.com/go-sql-driver/mysql" "github.com/gocarina/gocsv" "github.com/jmoiron/sqlx" ) func main() { ctx := context.Background() // MySQLコンテナを起動 connStr, cleanup, err := startMySQLContainer(ctx) if err != nil { log.Fatal(err) } defer cleanup() db, err := sqlx.Open("mysql", connStr) if err != nil { log.Fatal(err) } defer db.Close() // テーブル・マスターデータを作成 if err := setupTables(db); err != nil { log.Fatal(err) } // サンプルCSVを生成(50万行) csvFilename := "data.csv" if err := generateSampleCSV(csvFilename, 500000); err != nil { log.Fatal(err) } // 1. CSVを読み取る file, err := os.Open(csvFilename) if err != nil { log.Fatal(err) } defer file.Close() var products []Product if err := gocsv.UnmarshalFile(file, &products); err != nil { log.Fatal(err) } fmt.Printf("CSV読み込み完了: %d 行\n", len(products)) importStart := time.Now() for i, product := range products { // 2. 読んでない行があれば1行読み取る、なければ終了 lineNum := i + 2 // 3. category_codeをcategory_idに変換 var category Category if err := db.Get( &category, `SELECT * FROM categories WHERE code = ?`, product.CategoryCode, ); err != nil { log.Fatalf("行 %d: category_code %q の検索に失敗: %v", lineNum, product.CategoryCode, err) } // 4. supplier_codeをsupplier_idに変換 var supplier Supplier if err := db.Get( &supplier, `SELECT * FROM suppliers WHERE code = ?`, product.SupplierCode, ); err != nil { log.Fatalf("行 %d: supplier_code %q の検索に失敗: %v", lineNum, product.SupplierCode, err) } // 5. status_codeをstatus_idに変換 var status Status if err := db.Get( &status, `SELECT * FROM statuses WHERE code = ?`, product.StatusCode, ); err != nil { log.Fatalf("行 %d: status_code %q の検索に失敗: %v", lineNum, product.StatusCode, err) } // 6. ProductRowに変換 row := ProductRow{ ProductCode: product.ProductCode, ProductName: product.ProductName, CategoryID: category.ID, SupplierID: supplier.ID, StatusID: status.ID, UnitPrice: product.UnitPrice, } // 7. UPDATE文を実行する result, err := db.NamedExec(` UPDATE products SET product_name = :product_name, category_id = :category_id, supplier_id = :supplier_id, status_id = :status_id, unit_price = :unit_price WHERE product_code = :product_code`, row, ) if err != nil { log.Fatalf("行 %d: productsの更新に失敗: %v", lineNum, err) } rowsAffected, err := result.RowsAffected() if err != nil { log.Fatalf("行 %d: 更新件数の取得に失敗: %v", lineNum, err) } // 8. UPDATE対象がなければINSERTする if rowsAffected == 0 { _, err = db.NamedExec(` INSERT INTO products (product_code, product_name, category_id, supplier_id, status_id, unit_price) VALUES (:product_code, :product_name, :category_id, :supplier_id, :status_id, :unit_price)`, row, ) if err != nil { log.Fatalf("行 %d: productsの登録に失敗: %v", lineNum, err) } } if (lineNum-1)%1000 == 0 { rate := float64(lineNum-1) / time.Since(importStart).Seconds() fmt.Printf("進捗: %d / %d 行 (%.0f 行/秒)\n", lineNum-1, len(products), rate) } // 9. 2に戻る } fmt.Printf("完了: %d 行 (所要時間: %v)\n", len(products), time.Since(importStart)) } ::: :::details models.go (csv, dbを操作するのに必要な構造体を定義) package main type Product struct { ProductCode string `csv:"product_code"` ProductName string `csv:"product_name"` CategoryCode string `csv:"category_code"` SupplierCode string `csv:"supplier_code"` StatusCode string `csv:"status_code"` UnitPrice int `csv:"unit_price"` } type Category struct { ID int `db:"id"` Code string `db:"code"` Name string `db:"name"` } type Supplier struct { ID int `db:"id"` Code string `db:"code"` Name string `db:"name"` } type Status struct { ID int `db:"id"` Code string `db:"code"` Name string `db:"name"` } type ProductRow struct { ProductCode string `db:"product_code"` ProductName string `db:"product_name"` CategoryID int `db:"category_id"` SupplierID int `db:"supplier_id"` StatusID int `db:"status_id"` UnitPrice int `db:"unit_price"` } ::: :::details setup.go(DB初期化・CSV生成) package main import ( "context" "encoding/csv" "fmt" "math/rand" "os" "strconv" "time" "github.com/jmoiron/sqlx" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/mysql" "github.com/testcontainers/testcontainers-go/wait" ) func startMySQLContainer(ctx context.Context) (connStr string, cleanup func(), err error) { mysqlContainer, err := mysql.Run(ctx, "mysql:8.0", mysql.WithDatabase("testdb"), mysql.WithUsername("user"), mysql.WithPassword("password"), testcontainers.WithWaitStrategyAndDeadline(3*time.Minute, wait.ForListeningPort("3306/tcp"). WithStartupTimeout(3*time.Minute), ), ) if err != nil { return "", nil, err } connStr, err = mysqlContainer.ConnectionString(ctx) if err != nil { _ = mysqlContainer.Terminate(ctx) return "", nil, err } cleanup = func() { _ = mysqlContainer.Terminate(ctx) } return connStr, cleanup, nil } func generateSampleCSV(filename string, rows int) error { file, err := os.Create(filename) if err != nil { return err } defer file.Close() writer := csv.NewWriter(file) defer writer.Flush() if err := writer.Write([]string{"product_code", "product_name", "category_code", "supplier_code", "status_code", "unit_price"}); err != nil { return err } categoryCodes := []string{"CAT01", "CAT02", "CAT03"} supplierCodes := []string{"SUP01", "SUP02", "SUP03"} statusCodes := []string{"active", "discontinued", "pending"} for i := 0; i < rows; i++ { record := []string{ fmt.Sprintf("P%d", 1000+i+1), fmt.Sprintf("商品_%d", i+1), categoryCodes[rand.Intn(len(categoryCodes))], supplierCodes[rand.Intn(len(supplierCodes))], statusCodes[rand.Intn(len(statusCodes))], strconv.Itoa(rand.Intn(10000) + 100), } if err := writer.Write(record); err != nil { return err } } return nil } func setupTables(db *sqlx.DB) error { tables := []string{ `CREATE TABLE IF NOT EXISTS categories ( id INT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) UNIQUE NOT NULL, name VARCHAR(100) NOT NULL )`, `CREATE TABLE IF NOT EXISTS suppliers ( id INT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) UNIQUE NOT NULL, name VARCHAR(100) NOT NULL )`, `CREATE TABLE IF NOT EXISTS statuses ( id INT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(20) UNIQUE NOT NULL, name VARCHAR(100) NOT NULL )`, `CREATE TABLE IF NOT EXISTS products ( id INT AUTO_INCREMENT PRIMARY KEY, product_code VARCHAR(50) UNIQUE NOT NULL, product_name VARCHAR(255) NOT NULL, category_id INT NOT NULL, supplier_id INT NOT NULL, status_id INT NOT NULL, unit_price INT NOT NULL, FOREIGN KEY (category_id) REFERENCES categories(id), FOREIGN KEY (supplier_id) REFERENCES suppliers(id), FOREIGN KEY (status_id) REFERENCES statuses(id) )`, } for _, table := range tables { if _, err := db.Exec(table); err != nil { return err } } masterData := []string{ `INSERT IGNORE INTO categories (code, name) VALUES ('CAT01', '文房具'), ('CAT02', '食品'), ('CAT03', '電化製品')`, `INSERT IGNORE INTO suppliers (code, name) VALUES ('SUP01', '株式会社A商事'), ('SUP02', '株式会社B産業'), ('SUP03', '株式会社C物産')`, `INSERT IGNORE INTO statuses (code, name) VALUES ('active', '販売中'), ('discontinued', '販売終了'), ('pending', '販売準備中')`, } for _, data := range masterData { if _, err := db.Exec(data); err != nil { return err } } return nil } ::: :::details go.mod module csv-import-example go 1.24.5 require ( github.com/go-sql-driver/mysql v1.9.3 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/jmoiron/sqlx v1.4.0 github.com/testcontainers/testcontainers-go v0.40.0 github.com/testcontainers/testcontainers-go/modules/mysql v0.40.0 ) require ( dario.cat/mergo v1.0.2 // indirect filippo.io/edwards25519 v1.1.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect github.com/cpuguy83/dockercfg v0.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/docker v28.5.1+incompatible // indirect github.com/docker/go-connections v0.6.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/ebitengine/purego v0.8.4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/go-archive v0.1.0 // indirect github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/sys/sequential v0.6.0 // indirect github.com/moby/sys/user v0.4.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/shirou/gopsutil/v4 v4.25.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/testify v1.11.1 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/sdk v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/sys v0.38.0 // indirect google.golang.org/grpc v1.78.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) ::: 高速化するためにmain.goを改善していきます。 main.goの処理の流れをまとめると以下の通りです。 csvを読み取る product_code,product_name,category_code,supplier_code,status_code,unit_price P1001,ボールペン 黒,CAT01,SUP01,active,150 P1002,ボールペン 赤,CAT01,SUP01,active,150 ... 読んでない行があれば1行読み取る、なければ終了 P1001,ボールペン 黒,CAT01,SUP01,active,150 category_codeをcategory_idに変換 SELECT * FROM categories WHERE code = 'CAT01' -- => id=1, code='CAT01', name='文房具' supplier_codeをsupplier_idに変換 SELECT * FROM suppliers WHERE code = 'SUP01' -- => id=1, code='SUP01', name='株式会社A商事' status_codeをstatus_idに変換 SELECT * FROM statuses WHERE code = 'active' -- => id=1, code='active', name='販売中' ProductRowに変換 UPDATE文を実行する UPDATE products SET product_name = 'ボールペン 黒', category_id = 1, supplier_id = 1, status_id = 1, unit_price = 150 WHERE product_code = 'P1001' UPDATE対象がなければINSERTする INSERT INTO products (product_code, product_name, category_id, supplier_id, status_id, unit_price) VALUES ('P1001', 'ボールペン 黒', 1, 1, 1, 150) 2に戻る 実行してみる まずは現状を把握するため反映にどれくらい時間がかかるかみてみましょう。 testcontainersでMySQLコンテナを起動するため、事前にDocker Desktopを起動しておいてください。 また、依存パッケージを取得するために go mod tidy を実行してから go run . を実行します。 go mod tidy go run . このコードを実行してみると以下のような実行結果が得られます。 なんとDBへの反映に47分かかってしまいました。 $ go run . CSV読み込み完了: 500000 行 進捗: 1000 / 500000 行 (338 行/秒) 進捗: 2000 / 500000 行 (329 行/秒) 進捗: 3000 / 500000 行 (320 行/秒) 進捗: 4000 / 500000 行 (326 行/秒) 進捗: 5000 / 500000 行 (328 行/秒) 進捗: 6000 / 500000 行 (328 行/秒) 進捗: 7000 / 500000 行 (329 行/秒) 進捗: 8000 / 500000 行 (328 行/秒) 進捗: 9000 / 500000 行 (319 行/秒) ... 進捗: 500000 / 500000 行 (176 行/秒) 完了: 成功 500000 行, エラー 0 行 (所要時間: 47m23.503716s) 実際のFACTORYのマスタ反映の負荷状況 実際のFACTORYでの本番環境への反映では90分もの時間がかかっていました。 FACTORY本番のRDSでの負荷を計測するため、以下にDatabase Insightsの結果を示します。 図ではクエリ別にAAS(平均アクティブセッション)が示され、AASが高い順に並んでいます。 AASが高いほどDBに負荷がかかっており、低いほどDBに負荷がかかっていないというように解釈すればokです。 赤枠がマスタ反映時に実行されているSQLになりますが、 特定のテーブルに対するSELECTの実行回数が多い(1秒あたりに200回程度実行されている) SELECTよりも負荷は小さいものの、UPDATEも同程度の頻度で実行されている このように計測の結果、マスタ反映時に叩かれるSQL、特にSELECTが原因だなというように見当をつけ、改善を進めていきました。 原因を探る これだけの時間がかかる原因を探ってみましょう。 ここではコード中で実行されるクエリに着目してみます。 実行されているクエリは以下の通りです。 # クエリ ループ中(回) 合計(回) 1 SELECT * FROM categories WHERE code = ? 1 × 50万ループ = 50万 50万 2 SELECT * FROM suppliers WHERE code = ? 1 × 50万ループ = 50万 50万 3 SELECT * FROM statuses WHERE code = ? 1 × 50万ループ = 50万 50万 4 UPDATE products SET ... WHERE product_code = ? 1 × 50万ループ = 50万 50万 5 INSERT INTO products (...) VALUES (...) 最大1 × 50万ループ = 最大50万 最大50万 合計 最大250万 最大250万 1ループあたりの実行回数は少ないですが、今回はCSVが50万行あることから50万ループ実行され、最大で合計250万クエリ実行されることになります。 実行されるクエリが多いと、インデックスを貼って単体のクエリが高速にしたとしても、ちりつもで遅くなってしまいます。 特にDBは別サーバに分離されることが多く、ネットワークの通信帯域の影響も受けてしまいます。 なので高速化の方針としては実行されるクエリをいかに削減するかということを考えれば良さそうです。 実行されるクエリを削減するためには? SELECT編 実行されるクエリを削減するにはいくつかの手段がありますが、まずはオンメモリキャッシュを取り上げてみたいと思います。 オンメモリキャッシュは、時間のかかる処理の実行結果をあらかじめメモリ上に乗っけてしまい、結果が欲しい時にはメモリ上のデータから引っ張り出すことで高速化する手法です。ISUCONでは常套手段といっても良いほど典型的なパターンです。 今回でいくと時間のかかる処理とはDBへの問い合わせにあたります。 オンメモリでキャッシュするには、キャッシュ対象のデータが、キャッシュ中に書き換えられないほうが実装しやすいです。 キャッシュ中に実データに書き込みがある場合、キャッシュを書き込みに追随させるためデータの更新が必要になります。排他制御を考慮する必要があり、実装が困難になります。 productsテーブルを更新する際にはcategories, suppliers, statusesテーブルはすでに更新が完了しており、書き込みはありません。なのでproductsテーブルを更新する前にキャッシュしておけば問題なさそうです。 ということで先ほどのコードにキャッシュ処理を加えます。 CSV読み取り直後にSELECTを行い全件をメモリ上に載せます。 code→IDへ高速にデータを引きたいので、スライスではなくここでは map[string]int に載せてあげます。map型はキーにひもづくデータの取得で$O(1)$の計算量で高速にデータを引くことができます。 fmt.Printf("CSV読み込み完了: %d 行\n", len(products)) + // マスターデータをmapに読み込み(code → id) + var categories []Category + if err := db.Select(&categories, "SELECT * FROM categories"); err != nil { + log.Fatal(err) + } + categoryMap := make(map[string]int, len(categories)) + for _, c := range categories { + categoryMap[c.Code] = c.ID + } code→IDが欲しいタイミングで、先ほど定義したmap型の変数を使うように書き換えます // 3. category_codeをcategory_idに変換 - var category Category - if err := db.Get(&category, "SELECT * FROM categories WHERE code = ?", product.CategoryCode); err != nil { - log.Printf("行 %d: category変換エラー: %v", i+2, err) + categoryID, ok := categoryMap[product.CategoryCode] + if !ok { + log.Printf("行 %d: category変換エラー: code %q が見つかりません", i+2, product.CategoryCode) errorCount++ continue } 他の修正も加えると以下のような差分になります。 :::details オンメモリキャッシュ化の全体差分 diff --git a/main.go b/main.go index c3705d8..c3c16cf 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,34 @@ func main() { } fmt.Printf("CSV読み込み完了: %d 行\n", len(products)) + // マスターデータをmapに読み込み(code → id) + var categories []Category + if err := db.Select(&categories, "SELECT * FROM categories"); err != nil { + log.Fatal(err) + } + categoryMap := make(map[string]int, len(categories)) + for _, c := range categories { + categoryMap[c.Code] = c.ID + } + + var suppliers []Supplier + if err := db.Select(&suppliers, "SELECT * FROM suppliers"); err != nil { + log.Fatal(err) + } + supplierMap := make(map[string]int, len(suppliers)) + for _, s := range suppliers { + supplierMap[s.Code] = s.ID + } + + var statuses []Status + if err := db.Select(&statuses, "SELECT * FROM statuses"); err != nil { + log.Fatal(err) + } + statusMap := make(map[string]int, len(statuses)) + for _, s := range statuses { + statusMap[s.Code] = s.ID + } + importStart := time.Now() for i, product := range products { @@ -59,41 +87,29 @@ func main() { lineNum := i + 2 // 3. category_codeをcategory_idに変換 - var category Category - if err := db.Get( - &category, - `SELECT * FROM categories WHERE code = ?`, - product.CategoryCode, - ); err != nil { - log.Fatalf("行 %d: category_code %q の検索に失敗: %v", lineNum, product.CategoryCode, err) + categoryID, ok := categoryMap[product.CategoryCode] + if !ok { + log.Fatalf("行 %d: category_code %q の検索に失敗", lineNum, product.CategoryCode) } // 4. supplier_codeをsupplier_idに変換 - var supplier Supplier - if err := db.Get( - &supplier, - `SELECT * FROM suppliers WHERE code = ?`, - product.SupplierCode, - ); err != nil { - log.Fatalf("行 %d: supplier_code %q の検索に失敗: %v", lineNum, product.SupplierCode, err) + supplierID, ok := supplierMap[product.SupplierCode] + if !ok { + log.Fatalf("行 %d: supplier_code %q の検索に失敗", lineNum, product.SupplierCode) } // 5. status_codeをstatus_idに変換 - var status Status - if err := db.Get( - &status, - `SELECT * FROM statuses WHERE code = ?`, - product.StatusCode, - ); err != nil { - log.Fatalf("行 %d: status_code %q の検索に失敗: %v", lineNum, product.StatusCode, err) + statusID, ok := statusMap[product.StatusCode] + if !ok { + log.Fatalf("行 %d: status_code %q の検索に失敗", lineNum, product.StatusCode) } row := ProductRow{ ProductCode: product.ProductCode, ProductName: product.ProductName, - CategoryID: category.ID, - SupplierID: supplier.ID, - StatusID: status.ID, + CategoryID: categoryID, + SupplierID: supplierID, + StatusID: statusID, UnitPrice: product.UnitPrice, } ::: DBに問い合わせる代わりにメモリ上のキャッシュにデータを問い合わせるため、 SELECTの150万回分がなくなり、残りのUPDATE/INSERTの最大100万回にまで削減できました。 # クエリ ループ前(回) ループ中(回) 合計(回) 1 SELECT * FROM categories 1 0 1 2 SELECT * FROM suppliers 1 0 1 3 SELECT * FROM statuses 1 0 1 4 UPDATE products SET ... WHERE product_code = ? 0 1 × 50万ループ = 50万 50万 5 INSERT INTO products (...) VALUES (...) 0 最大1 × 50万ループ = 最大50万 最大50万 合計 3 最大100万 最大100万3 これでどれくらい高速化できたか見てみましょう。 CSV読み込み完了: 500000 行 進捗: 1000 / 500000 行 (282 行/秒) 進捗: 2000 / 500000 行 (302 行/秒) 進捗: 3000 / 500000 行 (330 行/秒) 進捗: 4000 / 500000 行 (360 行/秒) 進捗: 5000 / 500000 行 (378 行/秒) (略) 進捗: 496000 / 500000 行 (409 行/秒) 進捗: 497000 / 500000 行 (409 行/秒) 進捗: 498000 / 500000 行 (409 行/秒) 進捗: 499000 / 500000 行 (407 行/秒) 進捗: 500000 / 500000 行 (405 行/秒) 完了: 成功 500000 行, エラー 0 行 (所要時間: 20m35.34731075s) 以上のように時間を半減させることができました。 INSERT/UPDATE編 SELECTの実行回数は削減できましたが、まだ100万回ものSQLが実行されています。 残りのINSERT/UPDATEの高速化にチャレンジしてみます。 INSERT/UPDATEの実行回数を削減する手段としてはupsertに変更することが挙げられます。 UPSERTとは UPSERTとはINSERTとUPDATEを組み合わせた単語で、INSERT時に対象レコードが存在しない場合はINSERTと、すでに存在する場合はUPDATEをかける処理です。 MySQLではINSERT ON DUPLICATE KEY UPDATEとREPLACE構文が使えますが、今回は前者の構文を使ってみます。 今回でいくと以下のUPDATE文を実行し、 UPDATE products SET product_name = ?, category_id = ?, supplier_id = ?, status_id = ?, unit_price = ? WHERE product_code = ? UPDATE対象が存在しなければINSERTを行っています。 INSERT INTO products ( product_code, product_name, category_id, supplier_id, status_id, unit_price ) VALUES (?, ?, ?, ?, ?, ?) INSERT ON DUPLICATE KEY UPDATEを使用すると2つのクエリを1つにまとめることができます。 INSERT INTO products ( product_code, product_name, category_id, supplier_id, status_id, unit_price ) VALUES (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE product_name = VALUES(product_name), category_id = VALUES(category_id), supplier_id = VALUES(supplier_id), status_id = VALUES(status_id), unit_price = VALUES(unit_price) これだけで100万回→50万回までクエリの実行回数を削減できます。 # クエリ ループ前(回) ループ中(回) 合計(回) 1 SELECT * FROM categories 1 0 1 2 SELECT * FROM suppliers 1 0 1 3 SELECT * FROM statuses 1 0 1 4 INSERT INTO products (...) ON DUPLICATE KEY UPDATE ... 0 1 × 50万ループ = 50万 50万 合計 3 50万 50万3 コードでは以下のように修正しています :::details UPSERT化の差分 diff --git a/main.go b/main.go index c3c16cf..0da4db0 100644 --- a/main.go +++ b/main.go @@ -113,36 +113,23 @@ func main() { UnitPrice: product.UnitPrice, } - // 7. UPDATE文を実行する - result, err := db.NamedExec(` - UPDATE products - SET product_name = :product_name, - category_id = :category_id, - supplier_id = :supplier_id, - status_id = :status_id, - unit_price = :unit_price - WHERE product_code = :product_code`, + // 7. UPSERT(INSERT or UPDATE)を実行する + _, err := db.NamedExec(` + INSERT INTO products ( + product_code, product_name, category_id, supplier_id, status_id, unit_price + ) VALUES ( + :product_code, :product_name, :category_id, :supplier_id, :status_id, :unit_price + ) + ON DUPLICATE KEY UPDATE + product_name = VALUES(product_name), + category_id = VALUES(category_id), + supplier_id = VALUES(supplier_id), + status_id = VALUES(status_id), + unit_price = VALUES(unit_price)`, row, ) if err != nil { - log.Fatalf("行 %d: productsの更新に失敗: %v", lineNum, err) - } - - rowsAffected, err := result.RowsAffected() - if err != nil { - log.Fatalf("行 %d: 更新件数の取得に失敗: %v", lineNum, err) - } - - // 8. UPDATE対象がなければINSERTする - if rowsAffected == 0 { - _, err = db.NamedExec(` - INSERT INTO products (product_code, product_name, category_id, supplier_id, status_id, unit_price) - VALUES (:product_code, :product_name, :category_id, :supplier_id, :status_id, :unit_price)`, - row, - ) - if err != nil { - log.Fatalf("行 %d: productsの登録に失敗: %v", lineNum, err) - } + log.Fatalf("行 %d: productsのUPSERTに失敗: %v", lineNum, err) } if (lineNum-1)%1000 == 0 { ::: 実行してみましょう。 CSV読み込み完了: 500000 行 進捗: 1000 / 500000 行 (636 行/秒) 進捗: 2000 / 500000 行 (642 行/秒) 進捗: 3000 / 500000 行 (658 行/秒) 進捗: 4000 / 500000 行 (661 行/秒) 進捗: 5000 / 500000 行 (652 行/秒) (略) 進捗: 497000 / 500000 行 (650 行/秒) 進捗: 498000 / 500000 行 (650 行/秒) 進捗: 499000 / 500000 行 (650 行/秒) 進捗: 500000 / 500000 行 (650 行/秒) 完了: 成功 500000 行, エラー 0 行 (所要時間: 12m48.924974166s) この修正だけで10分程度まで早くすることができました。 bulk化する upsertに変更して50万回までSQLの実行回数を削減できました。 さらにSQLの実行回数を削減するためにSQLをbulk化してみます。 bulk化とはDBに対して複数のレコードに対する操作を1つのSQLにまとめて実行することを言います。 以下のUPSERT化したSQLはいまだ50万回叩かれています。 INSERT INTO products ( product_code, product_name, category_id, supplier_id, status_id, unit_price ) VALUES (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE product_name = VALUES(product_name), category_id = VALUES(category_id), supplier_id = VALUES(supplier_id), status_id = VALUES(status_id), unit_price = VALUES(unit_price) このSQLを1行ずつ入れていくのではなく、ある程度のレコード数で固めてから送ることで SQLの実行回数を減らせるわけです。 今回は1000レコード分ずつSQLをまとめて送ることにしてみましょう。 すると500000/1000=500回までSQLの実行回数を削減できます。 # クエリ ループ前(回) ループ中(回) 合計(回) 1 SELECT * FROM categories 1 0 1 2 SELECT * FROM suppliers 1 0 1 3 SELECT * FROM statuses 1 0 1 4 INSERT INTO products (...) VALUES (...), (...), ... ON DUPLICATE KEY UPDATE ... 0 50万ループ / 1000 = 500 500 合計 3 500 503 どれくらい固めるかを表す数値をバッチサイズと呼びますが、この場合バッチサイズは1000となります。 :::details バルクUPSERT化の差分 diff --git a/main.go b/main.go index 0da4db0..daf2689 100644 --- a/main.go +++ b/main.go @@ -80,8 +80,8 @@ func main() { statusMap[s.Code] = s.ID } - importStart := time.Now() - + // code → id 変換してProductRowスライスを構築 + var rows []ProductRow for i, product := range products { // 2. 読んでない行があれば1行読み取る、なければ終了 lineNum := i + 2 @@ -104,16 +104,29 @@ func main() { log.Fatalf("行 %d: status_code %q の検索に失敗", lineNum, product.StatusCode) } - row := ProductRow{ + // 6. ProductRowに変換 + rows = append(rows, ProductRow{ ProductCode: product.ProductCode, ProductName: product.ProductName, CategoryID: categoryID, SupplierID: supplierID, StatusID: statusID, UnitPrice: product.UnitPrice, + }) + } + fmt.Printf("変換完了: %d 行\n", len(rows)) + + // バルクUPSERT(1000行ずつ) + const batchSize = 1000 + importStart := time.Now() + + for i := 0; i < len(rows); i += batchSize { + end := i + batchSize + if end > len(rows) { + end = len(rows) } + batch := rows[i:end] - // 6. UPSERT(INSERT or UPDATE)を実行する _, err := db.NamedExec(` INSERT INTO products ( product_code, product_name, category_id, supplier_id, status_id, unit_price @@ -126,17 +139,16 @@ func main() { supplier_id = VALUES(supplier_id), status_id = VALUES(status_id), unit_price = VALUES(unit_price)`, - row, + batch, ) if err != nil { - log.Fatalf("行 %d: productsのUPSERTに失敗: %v", lineNum, err) + log.Fatalf("バッチ %d-%d: UPSERTに失敗: %v", i+1, end, err) } - if (lineNum-1)%1000 == 0 { - rate := float64(lineNum-1) / time.Since(importStart).Seconds() - fmt.Printf("進捗: %d / %d 行 (%.0f 行/秒)\n", lineNum-1, len(products), rate) + if end%10000 == 0 || end == len(rows) { + rate := float64(end) / time.Since(importStart).Seconds() + fmt.Printf("進捗: %d / %d 行 (%.0f 行/秒)\n", end, len(rows), rate) } - // 8. 2に戻る } fmt.Printf("完了: %d 行 (所要時間: %v)\n", len(products), time.Since(importStart)) ::: では実行してみましょう。 CSV読み込み完了: 500000 行 変換完了: 500000 行 (エラー 0 行) 進捗: 10000 / 500000 行 (56843 行/秒) 進捗: 20000 / 500000 行 (72234 行/秒) 進捗: 30000 / 500000 行 (78721 行/秒) 進捗: 40000 / 500000 行 (73047 行/秒) 進捗: 50000 / 500000 行 (76230 行/秒) 進捗: 60000 / 500000 行 (78932 行/秒) 進捗: 70000 / 500000 行 (81193 行/秒) (略) 進捗: 460000 / 500000 行 (83997 行/秒) 進捗: 470000 / 500000 行 (83998 行/秒) 進捗: 480000 / 500000 行 (84197 行/秒) 進捗: 490000 / 500000 行 (83433 行/秒) 進捗: 500000 / 500000 行 (83642 行/秒) 完了: 成功 500000 行, エラー 0 行 (所要時間: 5.977838667s) わずか6秒程度で完了するようになりました! 元々50分かかっていた処理だと考えると、かなり高速化されたのではないかと思います。 改善後の実際のFACTORYでのDBの負荷状況 改善の結果を先述のDatabase InsightsのAASで確認してみましょう。 赤枠がマスタ反映時に実行されているSQLになりますが、 改善前に負荷がかかっているSQLとして挙げられていたSELECTがなくなって、ボトルネックを解消した INSERTはまだいるが実行回数が減り、AASも減った このように実際のFACTORYのDBの計測からも負荷が減ったことがわかります。 この改善の結果、5分程度で反映が終わるようになりました! 改善前は90分かかっていたと考えるとめちゃくちゃ高速化できました! まとめ 今回の改善の変遷をまとめると以下の通りです。 ステップ 施策 所要時間 SQL実行回数(最大) 改善前 - 47分 250万回 1. オンメモリキャッシュ SELECTをメモリ参照に置換 20分 100万回 2. UPSERT化 UPDATE+INSERTを1クエリに統合 13分 50万回 3. バルクUPSERT化 1000行ずつまとめて実行 6秒 500回 パフォーマンスチューニングでとった方法はどれもISUCONではよく出てくる典型的な対応策です。 まさかISUCONで培った知識を使って業務でこれほどまでの結果を出せるとは思いもしませんでした。 ISUCONは業務でも役に立ちます。 これからもISUCONで腕を磨きつつ、業務でのボトルネックを改善していきたいと考えています。
DBRE (DataBase Reliability Engineering)チームの taka-h です。 大規模なデータ更新や削除は、やりたいこと自体はSQLで表現できても、そのまま一度に実行すると運用上のリスクが高くなります。例えば大きなトランザクションが発生すると、レプリケーション遅延やDB負荷の増大、UNDOログの肥大化などにつながり、結果としてサービス影響を招く可能性があります。 そこで私たちは、UPDATE/DELETEのような「最終的にやりたい操作」をSQLに近い形で記述しつつ、実行時には安全な単位に分割して処理できる汎用ツールを実装しました。さらに、実行中に処理速度などの設定を変更できることや、監視結果に応じて自動で一時停止できることなど、実運用で必要になる制御も組み込んでいます。 本記事では、なぜこの問題が起きるのか、従来どのように回避してきたのか、そして今回のツールがどのように安全性と運用性を両立するのかを紹介します。最後に、ツールのREADMEも公開するので、同様の課題を持つ方が自分たちの環境に合わせて実装する際の叩き台として使えるはずです。 なおこのツールは、社内の次のようなデータベース運用の支援を前提とします。 データをアーカイブ/削除する データをバックフィルする データを一括で更新する 大規模データの更新/削除操作における課題 小規模なデータベースであれば、目的のSQLをそのまま実行しても問題にならないことがあります。一方で、一定以上の規模のデータを扱う場合は、同じSQLでも“そのまま一括実行する”こと自体がリスクになります。 主な理由は、処理対象が多いと大きなトランザクションが発生しやすく、その副作用がDB全体に波及するためです。具体的には、変更の伝播(レプリケーションなど)に遅延が発生したり、DBが高負荷になったり、UNDOログが肥大化して回復や性能に影響が出たりします。 このような場合の従来の方針は、「対象を小分けにして処理する」でした。たとえば、対象の主キーをある程度の件数に分割し、短いトランザクションを繰り返すようなSQLを作成してもらったり、専用の使い切りのスクリプトを都度用意して対応していました。 BEGIN; -- 対象の主キーを少量ずつ指定して処理する DELETE FROM items WHERE id IN (...); COMMIT; SLEEP ...; ただし、毎回使い切りのスクリプトを作ったり、対象主キーを取り出して分割したりするのは手間です。依頼者側に“安全な形のSQL”を組み立ててもらう必要が出るなど、運用コストが積み上がっていきます。 そこで、この問題に対して汎用的な解決策を提供するツールを実装しました。 解決策: 汎用化ツール このツールでは、利用者は「最終的に達成したい条件」をSQLに近い形で記述します。一方で実行時には、その条件に合致する対象を主キー単位で取得し、バッチに分割して短いトランザクションを繰り返すことで、安全にUPDATE/DELETEを進められるようにしています。 また、実運用では「削除や更新の進捗」とは独立に、DB全体が高負荷になったり、想定外の問題が発生したりします。そのため、状況に応じて処理速度や挙動を調整できること、そして必要なら自動的に一時停止できることが重要です。 この要件に対して本ツールでは、処理間隔やバッチサイズなどの設定を実行中に変更できる機能を持たせています。これは、MySQLのオンラインスキーマ変更ツールである gh-ost が「実行中に操作を制御できる」点で運用上便利なのと同じ発想です。さらに、監視結果に応じて自動で処理を一時停止する仕組みも組み込んでいます。 最終的なコンフィグ例は上図の通りです。実行したい条件(SQLに近い記述)と、どう安全に実行するか(運用上の関心事項)を分離して設定できます。また、processingに属する項目の多くは実行中に変更可能です。 このツールは主に生成AIを利用して実装し、動作確認のうえ社内で既に利用しています。コード自体のOSSとしての公開にはふみきれなかったのですが、次の章でこのツールのREADME.mdを公開します。これをご利用の環境に合わせた要件の追加、修正をしていただいた上で、生成AIを利用し同様のツールが利用できるようになることを期待しています。 もし試してみて有用だった点や改善アイデアがあれば、SNSなどで議論いただけると嬉しいです。また、「メルカリのDBREチームの公開したREADME.mdで作ってみた」ということで宣伝いただけるとありがたいです。 最後に、現在メルカリでは、この記事の発行者の所属する DBREチーム の EM(Engineering Manager) を募集しています。詳しくは こちら をご覧ください。 汎用データ更新ツールのREADME.md # data-updater A tool for batch data operations (UPDATE, DELETE, or NULL) on database records using primary keys with configurable conditions. ## Features - Cursor-based batch processing with configurable batch size - **Three operation types**: UPDATE, DELETE, and NULL (before_sql only) - **Parallel execution**: SELECT and UPDATE operations run concurrently for better performance - **Replica support**: Route SELECT queries to replica database to reduce primary load - **JOIN support**: Complex queries with multiple tables to identify target records - **Before SQL hooks**: Execute SQL before each batch (archiving, audit logging) - **Custom ORDER BY**: Process records in custom order - Interactive commands for runtime control (similar to gh-ost) - **YAML-based configuration**: All settings in a single configuration file - Real-time status monitoring with ETA - Pause/resume functionality - Dynamic configuration updates - Socket-based remote control interface - **Failed ID tracking**: Records failed updates and displays summary on exit - For batch-level failures: Records only first and last ID of the failed batch - For partial updates: Logs the discrepancy but doesn't track individual IDs - Writes detailed report to file if >100 failures - **Automatic resume**: Saves progress to status file after each batch - Automatically resumes from last successful position on restart - No need to manually track progress or specify resume points - Status files are adapter/table specific for multiple concurrent jobs ## Install ```bash go install github.com/xxx/cmd/data-updater ``` ## Quick Start 1. Create a configuration file: ```yaml # config.yaml database: host: localhost port: 3306 user: myuser password: mypassword database: mydatabase options: charset: utf8mb4 parseTime: "true" processing: batch_size: 1000 interval: 1s adapter: table_name: users pk_columns: - user_id update_sql: "status = 'processed', updated_at = NOW()" where_clause: "status = 'pending'" ``` 2. Run the tool: ```bash # Normal mode - executes updates data-updater --config config.yaml # Debug mode - SELECT only, no updates data-updater --config config.yaml --debug # Resume from specific ID data-updater --config config.yaml --resume-from "12345" # Show version data-updater -v ``` ## Operation Types The tool supports three operation types: ### UPDATE (default) Updates records matching the specified conditions. ```yaml adapter: table_name: users pk_columns: ["user_id"] operation: update # or omit (default) update_sql: "status = 'processed', updated_at = NOW()" where_clause: "status = 'pending'" ``` ### DELETE Deletes records matching the specified conditions. **Important**: The DELETE operation permanently removes data. Always test with --debug mode first. ```yaml adapter: table_name: old_logs pk_columns: ["id"] operation: delete where_clause: "created_at < '2023-01-01'" ``` ### NULL Executes only before_sql without UPDATE or DELETE. Useful for archiving, copying, or transforming data. ```yaml adapter: table_name: items pk_columns: ["id"] operation: "null" before_sql: | INSERT INTO archived_items (id, name, created_at, archived_at) SELECT id, name, created_at, NOW() FROM items WHERE id IN (?) where_clause: "status = 'inactive'" ``` ## Configuration All settings are managed through a YAML configuration file: ### Database Configuration ```yaml database: host: localhost # Database host (default: localhost) port: 3306 # Database port (default: 3306) user: myuser # Database user (required) password: mypassword # Database password (required) database: mydatabase # Database name (required) options: # MySQL connection options (optional) charset: utf8mb4 parseTime: "true" loc: UTC timeout: 30s # Replica configuration (optional) replica_host: replica-db.example.com # SELECT queries go here replica_port: 3306 # Defaults to primary port replica_user: replica_user # Defaults to primary user replica_password: replica_password # Defaults to primary password ``` When replica_host is configured: - SELECT queries (fetching PKs, COUNT) are routed to replica - UPDATE/DELETE operations always use primary - SELECT FOR UPDATE (pessimistic locking) uses primary ### Processing Configuration ```yaml processing: batch_size: 1000 # Number of rows per batch interval: 1s # Time between batches (e.g., 1s, 500ms, 2m) debug_mode: false # Log queries without executing updates pipeline_buffer: 1 # Buffer size for parallel SELECT/UPDATE pessimistic_locking: true # Use SELECT FOR UPDATE (default: true) lock_retry_count: 3 # Number of lock acquisition retries ``` ### Adapter Configuration ```yaml adapter: table_name: users # Target table (required) table_alias: u # Alias for main table (required when using joins) pk_columns: # Primary key column(s) (required) - user_id operation: update # "update" (default), "delete", or "null" update_sql: "status = 'processed'" # SET clause (required for update) before_sql: "..." # SQL to execute before operation (required for null) where_clause: "status = 'pending'" # Additional WHERE (optional) join_clause: "..." # JOIN statements (optional) order_by: "created_at" # Custom ORDER BY (optional, defaults to PK) ``` ### Interactive Control ```yaml interactive: enabled: true # Enable socket-based control socket_path: "/tmp/data-updater.sock" # Unix socket path ``` ### Status File (Automatic Resume) ```yaml status_file: enabled: true # Enable automatic resume path: "/var/lib/status" # Custom path (optional) ``` ## Advanced Features ### JOIN Support Use JOINs for complex queries that need to reference multiple tables: ```yaml adapter: table_name: items table_alias: i pk_columns: ["id"] operation: delete join_clause: | LEFT JOIN transaction_evidences te ON te.item_id = i.id where_clause: | i.status = 'cancel' AND te.id IS NULL ``` **How it works:** 1. SELECT query uses JOINs + WHERE to fetch PKs 2. DELETE/UPDATE query only uses primary keys (no JOINs) ### Before SQL (Pre-operation Hook) Execute SQL before each batch within the same transaction: ```yaml adapter: table_name: items pk_columns: ["id"] operation: delete before_sql: | INSERT INTO deleted_item_ids (id, created, deleted) SELECT id, created, NOW() FROM items WHERE id IN (?) where_clause: "status = 'cancel'" ``` **Notes:** - Use IN (?) placeholder - expanded to all PKs in the batch - For composite keys: (col1, col2) IN (?) - Executed atomically with the main operation - If before_sql fails, entire transaction is rolled back ### Custom ORDER BY Process records in a specific order: ```yaml adapter: table_name: items table_alias: i pk_columns: ["id"] order_by: "i.created, i.id" ``` ### Understanding update_sql The update_sql parameter specifies the SET clause. **Do not include trailing semicolons.** ```yaml # Simple status update update_sql: "status = 'processed'" # Results in: UPDATE users SET status = 'processed' WHERE user_id IN (...) # Multiple columns update_sql: "status = 'archived', archived_at = NOW()" # Using CASE statements update_sql: | status = CASE WHEN last_login < NOW() - INTERVAL 30 DAY THEN 'inactive' ELSE 'active' END ``` **Important**: - Do NOT include UPDATE keyword, table name, or WHERE clause - The tool automatically adds WHERE pk IN (...) for batch updates ### Using where_clause for Idempotent Operations Make updates safe to run multiple times: ```yaml adapter: update_sql: "status = 'processed', processed_at = NOW()" where_clause: "status = 'pending'" # Results in: UPDATE users SET ... WHERE user_id IN (...) AND status = 'pending' ``` ## Command Line Options - --config, -c : Path to YAML configuration file (required for operation) - --debug, -d : Enable debug mode (SELECT only, no updates) - --resume-from : Manual resume from specific primary key(s) - --total-rows : Skip initial COUNT query and use provided value (e.g., --total-rows 1000000 ). Also used as a stop condition based on rows_handled (rows selected), not rows_processed (rows affected by UPDATE) - --pk-source : Read PKs from file/directory instead of table (local path or gs://bucket/path ) - --version, -v : Show version information - --help, -h : Show help message ## Interactive Commands Control the tool via Unix socket: ```bash # Show status echo "status" | nc -U /tmp/data-updater.sock # Pause/resume processing echo "pause" | nc -U /tmp/data-updater.sock echo "resume" | nc -U /tmp/data-updater.sock # Change batch size echo "batch-size 5000" | nc -U /tmp/data-updater.sock # Change interval echo "interval 500ms" | nc -U /tmp/data-updater.sock # Show help echo "help" | nc -U /tmp/data-updater.sock # Auto-interval: show status / enable / disable / set min echo "auto-interval" | nc -U /tmp/data-updater.sock echo "auto-interval on" | nc -U /tmp/data-updater.sock echo "auto-interval off" | nc -U /tmp/data-updater.sock echo "auto-interval min 200ms" | nc -U /tmp/data-updater.sock ``` ## Debug Mode Debug mode allows you to verify queries without executing updates: ```bash data-updater --config config.yaml --debug ``` Example output: ``` INFO DEBUG: UPDATE query that would be executed query="UPDATE users SET status = 'processed' WHERE user_id IN (?,?,?)" args_count=3 primary_keys_count=3 ``` ## Resume Feature ### Automatic Resume (Default) - Progress saved after each successful batch - On restart, automatically resumes from last position - Status files named: data-updater-{table}-{adapter}.status ### Manual Resume ```bash # Single primary key data-updater --config config.yaml --resume-from "12345" # Composite primary key data-updater --config config.yaml --resume-from "tenant1,12345" ``` ### Resume Priority 1. Manual --resume-from (highest) 2. Status file (if exists) 3. Adapter's initial cursor (default) ### Skip COUNT Query Use --total-rows to skip the initial COUNT query: ```bash # Useful for large tables or retries where you know the total data-updater --config config.yaml --total-rows 1000000 ``` This is particularly useful when: - Retrying after interruption (you already know the count) - Large tables where COUNT(*) is expensive - Faster startup when exact count is not critical **Stop condition:** --total-rows stops the selector after handling (selecting) that many rows. The stop check uses rows_handled , not rows_processed . This means it works correctly even when UPDATE affects 0 rows (e.g., records already deleted by another process or filtered out by where_clause ). ## PK Source (Read PKs from File) Read primary keys from a file instead of the database table. **Important:** --total-rows is required when using --pk-source for accurate progress/ETA calculation. ```bash # Count lines first wc -l failed-ids.txt # 1500 failed-ids.txt # From local file (--total-rows is required) data-updater --config config.yaml --pk-source "./failed-ids.txt" --total-rows 1500 # From local directory (processes all files) data-updater --config config.yaml --pk-source "./failed-ids/" --total-rows 5000 # From GCS file data-updater --config config.yaml --pk-source "gs://bucket/failed-ids.txt" --total-rows 1500 # From GCS directory data-updater --config config.yaml --pk-source "gs://bucket/failed-ids/" --total-rows 10000 ``` Or configure in YAML: ```yaml pk_source: path: "gs://my-bucket/failed-ids/" gcs_project: "my-gcp-project" # Required for GCS paths skip_header: true # Skip first line (for BQ exports with header) prefetch_buffer: 5 # Number of GCS files to prefetch ahead (default: 5) ``` **GCS Authentication:** GCS access uses Application Default Credentials (ADC). Set up with: ```bash gcloud auth application-default login gcloud auth application-default set-quota-project <project> ``` **File format (CSV):** ``` # Comments starting with # are ignored 12345 12346 tenant1,12345 "value,with,comma",12346 ``` **Skip header (for BigQuery exports):** BigQuery exports include a header row with column names. Use skip_header: true to skip it: ```csv id 12345 12346 ``` **Features:** - Files are read line by line (streaming) to minimize memory usage - GCS files are prefetched in the background to eliminate download latency (configurable buffer, default 5) - Directory support: processes all files in sorted order - Resume support: tracks progress per file and line number - Can be combined with where_clause to filter PKs from file ## Status Metrics Status logs and the status interactive command report two counters: - ** rows_processed **: rows successfully affected by the UPDATE/DELETE operation (i.e., the database reported a row change) - ** rows_handled **: rows selected and sent through the pipeline, regardless of whether the UPDATE/DELETE actually modified the row. This counter is used for progress percentage and ETA calculations When rows_handled is higher than rows_processed , it typically means some rows were already in the desired state (e.g., already deleted or already updated by a previous run). ## Hibernate (Health-Check Based Pause) The hibernate feature allows the processor to periodically run an external health-check script. If the script returns a non-zero exit code (indicating a problem), the processor pauses for a configurable period, then automatically resumes. ### Configuration ```yaml processing: hibernate_script_path: "/path/to/check.sh" hibernate_pause_period: 30s hibernate_check_interval: 15s ``` - hibernate_script_path : Path to an executable script. The script is run at the configured check interval (default 15s). Exit code 0 means healthy; any non-zero exit code triggers hibernation. - hibernate_pause_period : How long the processor pauses when the script signals a problem. Required when hibernate_script_path is set. - hibernate_check_interval : How often the health-check script is executed. Defaults to 15s . ### Behavior 1. The health-check script is executed at the configured interval (default 15s) while the processor is running 2. If the script exits with code 0, processing continues normally 3. If the script exits with a non-zero code, the processor pauses for hibernate_pause_period , then automatically resumes 4. The hibernation_count metric tracks the total number of times hibernation was triggered (visible in status command output and periodic logs) ### Use Cases - Pause when database replication lag exceeds a threshold - Pause when disk space is low - Pause during maintenance windows - Any custom operator-defined health check ## Hourly Summary Log For long-running jobs, you can enable an hourly summary log that writes JSON entries to a dedicated file. A final summary is also written on shutdown, so short-lived runs still produce a report. ```yaml processing: hourly_log_path: "/var/log/data-updater/hourly.log" ``` Each JSON line includes: - rows_processed_total / rows_processed_delta — records processed in total and during the period - rows_failed_total / rows_failed_delta - hibernation_count_total / hibernation_count_delta - total_rows , rows_remaining , progress — overall progress - interactive_commands — commands issued via socket during the period (with timestamps) - summary_type — "hourly" or "final" If hourly_log_path is not set, the reporter is not started. ## Auto-Interval Adjustment Automatically adjusts the processing interval based on the hibernation ratio observed each hour. When many hibernate checks fail (high ratio), the interval increases (slows down). When the ratio is low, the interval decreases (speeds up). ```yaml processing: auto_interval_enabled: true auto_interval_high_ratio: 0.3 # ratio >= this → slow down (default: 0.3) auto_interval_low_ratio: 0 # ratio <= this → speed up (default: 0) auto_interval_increase_factor: 1.25 # multiply interval by this to slow down (default: 1.25) auto_interval_decrease_factor: 0.8 # multiply interval by this to speed up (default: 0.8) auto_interval_min: 200ms # floor for interval (default: initial interval) auto_interval_max: 30s # ceiling for interval (default: 10x min) ``` Auto-interval can be toggled at runtime via socket commands ( auto-interval on/off ). See [Interactive Commands](#interactive-commands). ## Pessimistic Locking Prevent concurrent modifications with pessimistic locking: ```yaml processing: pessimistic_locking: true # default lock_retry_count: 3 ``` Transaction pattern: ```sql BEGIN; SELECT ... FOR UPDATE WHERE ID IN (...); UPDATE ... WHERE ID IN (...); COMMIT; ``` - MySQL 8.0+: Uses NOWAIT clause - Sets innodb_lock_wait_timeout=1 to minimize lock wait ## Environment Variables Use environment variables for sensitive data: ```yaml database: host: "${DB_HOST}" user: "${DB_USER}" password: "${DB_PASSWORD}" database: "${DB_NAME}" ``` ## Examples See the examples/ directory for complete configuration files: - minimal-config.yaml : Bare minimum configuration - full-config.yaml : All available options with comments - production-config.yaml : Production-ready configuration - complex-update.yaml : Complex SQL with CASE statements - multiline-example.yaml : Multi-line SQL using YAML block scalars - update-sql-examples.yaml : Various update_sql patterns ## Production Tips 1. **Use environment variables** for sensitive data 2. **Enable status files** for automatic resume 3. **Set appropriate intervals** to avoid overwhelming the database 4. **Use pessimistic locking** for critical data consistency 5. **Configure replica** to offload SELECT queries from primary 6. **Test with debug mode** before running DELETE operations 7. **Use before_sql** to archive data before deletion ## Troubleshooting ### Common Issues 1. **Permission denied on socket**: Check socket path permissions 2. **Resume not working**: Verify status file path and permissions 3. **Slow processing**: Increase batch size or decrease interval 4. **Lock timeouts**: Enable pessimistic locking or increase retry count ## License See LICENSE file in the repository root.

動画

書籍