TECH PLAY

Microservices

イベント

該当するコンテンツが見つかりませんでした

マガジン

技術ブログ

こんにちは。Data Ingestion チームでData Engineerをしている @orfeon です。この記事は「 Merpay & Mercoin Tech Openness Month 2026 」の14日目の記事です。 はじめに Data Ingestion(旧Data Platform)チームでは、多数のマイクロサービスが管理する データベース・テーブル から、大量のデータを継続的にDWH(データウェアハウス)へ同期する必要があります。同期対象には数億〜 数百億件 に達する大規模なテーブルも含まれ、これらをいかに速く・安全に・一貫性を保ったまま抽出するかが、DWHの鮮度や安定性にとって大事になります。 これまで  Cloud Spanner  からのデータ取得では、Spannerの分散DB特有の機能(後述)を活用することで、大規模テーブルでも高いスループットでの取得を実現できていました。 一方、社内にはTiDBやAlloyDBといったSpanner以外のデータベースも多く利用されており、その中には数百億件以上に達するテーブルもあります。 これらのテーブルは従来、主キーなどで シーク方式 で取得していましたが、単一コネクションでの シーケンシャルなデータ取得 になるため、大規模テーブルでは取得に非常に時間がかかっていました。 そこで今回、Spannerと同じように、 それぞれのDBに特有の機能を活用して並列取得などでスループットを上げる よう工夫しました。 具体的には、 TiDB  と  AlloyDB  の大規模テーブルをDWHへ同期する仕組みを  Cloud Dataflow(Apache Beam)  上に構築しました。 本記事では、その中核となる2つのSourceモジュール   TiDBSource  と  PostgresSource   について、高いスループットを実現するための工夫を解説します。 なぜ汎用JDBCではなく専用モジュールなのか Beam/Dataflowには汎用的な  JdbcIO  が既に存在します。 しかし汎用JDBCは「 SELECT を実行して結果を1行ずつ読む」という標準的な経路をたどるため、大規模テーブルでは以下のボトルネックが発生します。 1行ごとのSQL処理オーバーヘッド : 通常のクエリ実行では、サーバ側でのタプルのテキスト/プロトコル変換などが行ごとに発生する。 並列化の難しさ : テーブルを並列に読むには「どこで分割するか」を決める必要があるが、 OFFSET ベースの分割はオフセットが大きくなるほど遅くなり、フルスキャンを誘発する。 一貫性の確保 : 並列に複数コネクションから読む場合、各コネクションが別々の時点を読むと整合性が崩れる。 そこで今回のモジュールでは、 それぞれのデータベースが持つネイティブなバルク転送機構と物理的なデータ配置情報を活用 し、汎用JDBCのボトルネックを回避する設計にしました。 加えて運用上の大きなメリットとして  分割キー(フィルタ条件)の自動抽出  があります。 マイクロサービスごとに膨大なテーブルを扱う環境では、テーブル1つひとつに対して「どのカラムで分割するか」を人手で指定するのは現実的ではありません。 両モジュールはテーブルのメタデータから 主キー(PK)や暗黙の行ID、物理ブロック位置を自動で見つけ出し 、分割範囲の絞り込み条件を組み立てます。 利用者は接続先とテーブル名を指定するだけで、同じ設定が多数のテーブルに横展開することができます。 なぜCloud Spannerでは高いスループットでデータ取得が可能なのか 今回の設計の発想は、既にうまくいっていたSpannerからの取得方法を、TiDBやAlloyDBにも持ち込むことにありました。 そこでまずSpannerが大規模テーブルでも高いスループットを出せている理由を説明します。 Spannerは分散データベースとして、以下の機能を組み合わせています。 PartitionQuery / PartitionRead(Splitベースの自動分割) : Spannerはデータを内部的に  Split (キー範囲+負荷ベース)へ分割して保持しています。 PartitionQuery  はこのSplit境界に基づいてクエリを複数のパーティションに自動分割します。クライアントはキー範囲などSplitの内部構造を意識する必要がありません。 BatchReadOnlyTransaction(スナップショット一貫性) : 全パーティションの読み取りが、 TimestampBound  で指定した同一スナップショットを参照することを保証します。ロックを取らずに一貫した読み取りができます。 Partition Tokenの分散・並列実行 : 分割結果は シリアライズ可能なPartition Token として返されるため、複数プロセス・複数マシン、そして Beam Worker に配布してそのまま並列実行できます。Apache Beamの  SpannerIO  も内部でこの仕組みを使っています。 Partition Tokenによる自動バージョン保持 : Tokenが有効な間は対象バージョンがGCされないことが保証されるため、クライアント側で明示的なバージョン保護(SafePoint管理)が不要です。 Data Boost(Spanner固有) : Google管理の独立した計算リソースで読み取るオプションで、 本番ワークロードへの影響をほぼゼロ にしつつ弾力的にスケールできます。 これらは「 物理的なデータ配置に沿った自動分割 」「 スナップショット一貫性 」「 分割単位の分散ワーカーへの配布と並列実行 」という構図で成り立っています。Spannerではこれらが高度に抽象化されたAPIとして提供されていますが、 TiDBやAlloyDB(PostgreSQL)にもそれに近いDB固有の機能が存在します 。 このSpannerの機能とTiDBやPostgreSQLの機能は以下のように対応します。 Spanner TiDB(dumpling相当) AlloyDB(PostgreSQL) PartitionQuery(Split境界で自動分割) TABLESAMPLE REGIONS() (TiKV Region境界) ctid 物理ブロック範囲( pg_relation_size ) BatchReadOnlyTransaction(スナップショット) tidb_snapshot MVCC(Multi-Version Concurrency Control) + TSO(Timestamp Oracle) バッチ読み取り( ctid スナップショットのズレは許容) Partition Tokenの分散実行 Range条件の分散実行(本記事の設計) Range条件の分散実行(本記事の設計) Partition Tokenによる自動GC保護 tidb_gc_life_time  の引き上げで代替 (該当なし) SpannerのSpannerIOで提供されている「分割 → 配布 → 並列スナップショット読み取り」を、TiDB/AlloyDBではDB固有の機能を組み合わせて自前で構築する、というのが本記事のモジュールの狙いです。 以降その共通の仕組みと各DB向けの実装を見ていきます。 共通アーキテクチャ 両モジュールに共通する基本戦略は次の3ステップです。  TiDBSource / PostgresSource はCloud Dataflow バッチジョブとして実行され、以下3つのステップで役割が分かれます。 テーブルの範囲分割 : 1本のコネクションでメタデータだけを取得し、テーブルを物理的な分割単位(Range)のリストに列挙する 再分配 : 分割単位をPCollectionの「種」として撒き、 Reshuffle でワーカーに再分配する 並列読み込み : 各ワーカーが担当Rangeをネイティブのバルク転送機構で並列に読み取る 以降、TiDBとPostgreSQLそれぞれについて、この3ステップの中身を掘り下げます。まずTiDBから、この3つのステップがどのように実装されるかを見ていきます。 TiDBテーブルからのデータ抽出 TiDB公式ツール dumpling に学ぶ TiDBには  dumpling  という高速なエクスポートツールが公式に提供されています。 TiDBSource の設計は、このdumplingが高スループットを実現している仕組みを参考にしています。 まずはdumpling側の要点を整理します。 テーブルのチャンク分割と並列読み取り dumplingは、1テーブルを丸ごと1クエリで読むのではなく、テーブルをチャンク(範囲)に分割し、各チャンクを独立したSELECTクエリとして並列実行します。 チャンク分割は3段階のフォールバック構造になっています。 戦略 方式 概要 A(最優先) TiKV Regionベース分割 TABLESAMPLE REGIONS()  でRegion境界をチャンク境界にする B(フォールバック) 数値インデックスベース分割 数値型PK/インデックスのMIN/MAXから均等分割 C(最終) テーブル全体ダンプ 分割可能なフィールドがない場合は1クエリ 特に重要なのが戦略Aです。 TiDBではデータがTiKV上で Region(デフォルト96MB単位) に分散配置されます。 dumplingはこのRegion境界をそのままチャンク境界として利用するため、各チャンクが異なるTiKVノードへの読み取りリクエストに分散され、クラスタ全体のI/O帯域を引き出せます。 dumplingの並列実行の仕組み: Producer-Consumer 分割したチャンクを並列に読み出すために、dumplingは内部で Producer-Consumer という構造をとります。登場人物は次の3つです(いずれもdumplingの実装に出てくる用語です)。 Producer(プロデューサ) : テーブルをチャンクに分割し、「このチャンクを読め」というタスクを作り続ける係。dumplingではメインのgoroutineが担当します。先ほどのRegion境界などをもとにタスクを生成します。 Writer(ライター) : 生成されたタスクを受け取り、実際にSELECTを発行してデータを読み出す係。 --threads で指定した数だけ並列に動き、それぞれが独立したDB接続を持ちます。タスクを消費するConsumer側にあたります。 infiniteChan(無制限チャネル) : ProducerとWriterの間をつなぐ、容量に上限のないキュー(待ち行列)。Writerの処理が詰まってもProducerがブロックされず、生成済みのタスクをいくらでも貯めておけます。 このように、タスクを作成する人(Producer)とタスクを実行する人(Writer)を分離し、その間を待ち行列(infiniteChan)でつなぐことで、分割と読み取りを互いに待たせずに並列で回す基本構造です。後述のTiDBSourceは、この役割分担をそのままDataflowの分散モデルに置き換えています。 Snapshot読み取り dumplingはTiDBのMVCC (Multi-Version Concurrency Control)機構を利用し、特定のTSO(Timestamp Oracle)時点の  スナップショット  から一貫したデータを読み取ります。 ロック不要 :  FLUSH TABLES WITH READ LOCK  のような排他ロックが不要で、書き込みをブロックしない。 一貫性保証 : 全Writerが同一時点のデータを読むため整合性が保たれる。 高スループット : ロック競合がないため並列度を上げられる。 加えてdumplingは、長時間のダンプ中にTiDBのGC(Garbage Collection)がスナップショット時点の古いバージョンを回収しないよう、PD(Placement Driver)に対してGC SafePointを登録します。 TiDBの機能を活用したDataflowでの実装 TiDBSource は、dumplingのこれらのアイデアを Apache Beam / Dataflowのモデルに移植 したものです。dumplingがgoroutineで実現していた並列性を、Dataflowの分散ワーカーによる並列性に置き換えています。対応関係は次の通りです。 dumpling TiDBSource (Dataflow) Producerがチャンクタスクを生成 パイプライン構築時に Range のリストを生成 infiniteChan  + 複数Writer goroutine Reshuffle  +  ParDo による分散ワーカー並列処理 各Writerが独立DB接続でSELECT 各ワーカーが @Setup で独自コネクションを確立 TSOスナップショット読み取り TSOを一度取得し全ワーカーに配布 ステップ1: 分割キーの決定とRangeの列挙 パイプラインの初期起動時に1本のコネクションを張り、出力スキーマの確定・スナップショットTSOの取得・テーブルの分割を行います。 ここではメタデータと境界値だけを読み、実データのスキャンは行いません。 分割キーの自動解決  は次の優先順位で行われます。利用者がカラムを指定しなくても、テーブルのメタデータから自動的に決定されます。 利用者が  splitField  を明示指定していればそれを使う なければ 単一カラムの主キー(PK) それもなければ 暗黙の行ID  _tidb_rowid (クラスタードキーを持たないテーブル向け) _tidb_rowid  は、明示的な主キーを持たないテーブルでTiDBが内部的に振る暗黙の行IDです。 これを分割キーに使えるため、主キー設計に依存せず、どんなテーブルでも分割の足がかりを得られます。 Rangeの列挙  は、先述のdumplingの戦略A→B→Cと同じ3段階フォールバックで行います。 戦略Aは、次のSQLでTiKVのRegion境界を取り出します。 SELECT `pk` FROM table TABLESAMPLE REGIONS() ORDER BY `pk` TABLESAMPLE REGIONS() は各Regionの先頭行を返すため、結果の各値が「次のチャンクの下限」になります。 境界値の列 b[1], b[2], …, b[n] から、隣り合う境界で挟まれた半開区間を生成します。 取りこぼしを防ぐため、最初の区間は下側を、最後の区間は上側を開いておきます。 chunk[ -∞, b[1] ), chunk[ b[1], b[2] ), …, chunk[ b[n], +∞ ) TABLESAMPLE REGIONS() はTiDB v5.0以降の構文です。 非TiDBのMySQLや古いTiDBではこのクエリが失敗するため、自動的に戦略B(数値MIN/MAX均等分割)へフォールバックします。 戦略Bは、SELECT MIN(pk), MAX(pk) で取得した範囲を、推定行数とチャンクあたりの目標行数 splitSize から決めた個数で等分します。 chunks = ⌈ 推定行数 / splitSize ⌉ step = (max − min) / chunks + 1 区間 = [min, min+step), [min+step, min+2·step), … , [ …, max] (stepの計算では厳密な切り上げ ⌈(max−min)/chunks⌉ ではなく+1 としています。半開区間 [cutoff, cutoff+step) で走査するため、割り切れるケースでもmax が最終チャンクに確実に含まれるようstep を 1 大きく取っており、実際のチャンク数は chunks 以下になります) ステップ2: Rangeの再分配(Reshuffle) 範囲が決まったら次にワーカーに範囲ごとの処理を並列にさせる必要があります。列挙した Range のリストを並列実行するよう明示的に指定するためにPCollection化した Range の後に、 Reshuffle.viaRandomKey()  を挟みます。  Reshuffle  には2つの重要な役割があります。 fusionの分断 : Dataflowは連続する処理を一つの処理Stageとして結合してしまうことがあります。これがないとDataflowは Create と後続の読み取り ParDo を融合し、Rangeが1ワーカーに偏って並列性が出ません。 ランダムキーによる再分配 : 全Rangeが利用可能なワーカーへ均等にばらまかれ、クラスタ全体に読み取り負荷が分散されます。 これがdumplingの「 infiniteChan から複数Writerがタスクを取り出す」構造に相当します。 ステップ3: 各ワーカーでのスナップショット並列読み取り 各ワーカーは処理開始時( @Setup )に自前のコネクションを確立し、パイプライン構築時に取得・配布されたTSOで  tidb_snapshot  をセットします。  全ワーカーが同一TSOを使うことで、分散読み取りでも単一時点の一貫したスナップショットになります 。 TSOの取得は、一貫スナップショットを開始してその時点の論理時刻を読むだけで完了します(トランザクション自体はすぐロールバックします)。 START TRANSACTION WITH CONSISTENT SNAPSHOT;SELECT @@tidb_current_ts; -- ← この値(TSO)を全ワーカーに配布 各ワーカー側では、読み取り前にセッション変数を設定します。 SET @@tidb_snapshot = '<配布されたTSO>'; -- ロックなしで同一MVCC版を読む SET @@session.tidb_enable_paging = ON; -- 大量スキャン時のメモリ使用量を抑制 tidb_enable_paging  はCoprocessorリクエストのメモリ使用量を抑える設定です(TiDB v6.2.0以降はデフォルト有効。変数を知らないDBではスキップ)。 実際の読み取りでは、担当Rangeを分割キーへの 値の範囲条件 に変換してSELECTに組み込みます。 SELECT * FROM table WHERE `pk` >= 1000 AND `pk` < 2000 ここで重要なのは、この条件が 主キー(インデックス)に対する値の比較 である点です。 データベースはこの行値比較を使ってインデックスを シーク し、該当範囲の先頭へ直接ジャンプして必要な行だけを読みます。  OFFSET のように先頭から数え直す必要がないため、どのチャンクも一定コストで読み出せます。 また、読み取りはMySQL Connector/Jの 行単位ストリーミングモード ( fetchSize = Integer.MIN_VALUE )で行います。 これは結果セット全体をワーカー側メモリにバッファせず1行ずつ取り出す特別な設定で、巨大チャンクでもワーカーのメモリ消費が一定に保たれます。 dumplingがgo-sql-driver/mysqlのストリーミング動作で実現しているのと同じ効果を、JDBC側で引き出している形です。 制約: GC SafePoint dumplingはPDクライアント経由でGC SafePointを登録しますが、これは JDBCコネクションからは到達できません。 そのためTiDBSourceではSafePoint登録を行わず、長時間の読み取りに対しては運用側でクラスタの tidb_gc_life_time を引き上げ、読み取り中にGCがスナップショットのバージョンを回収しないようにする運用方針としています。 AlloyDB(PostgreSQL)テーブルからのデータ抽出 AlloyDBとPostgreSQL互換性 AlloyDBは、Google CloudがPostgreSQL互換で提供するフルマネージドのデータベースサービスです。  ワイヤプロトコルもSQLの方言もPostgreSQLと互換 であるため、クライアントから見ればPostgreSQLそのものとして扱え、標準のPostgreSQL JDBCドライバがそのまま使えます。 さらに  ctid (物理行位置)や  COPY 、 pg_relation_size  といったPostgreSQLの内部機能・システムカタログも利用できます。 したがって、AlloyDBからのデータ抽出は  PostgreSQLの機能を前提に設計できます 。 以下では PostgresSource がPostgreSQLのどの機能を使っているかを説明しますが、その内容は基本的にAlloyDBにも当てはまります。 通常のJDBCクエリ取得との違い PostgresSource も「物理的な分割 → 並列読み取り」という骨格はTiDBと同じですが、データ転送経路とテーブル分割の方式 がPostgreSQL固有の機能に最適化されています。 まず、通常のJDBC経由のクエリ取得との違いを整理します。 観点 通常のJDBC ( SELECT ) PostgresSource ( COPY ... TO STDOUT BINARY ) 転送経路 拡張クエリプロトコル COPYプロトコル(バルク転送専用) 行ごとの処理 パース/プラン/結果整形が走りうる クエリは1回プラン。あとはタプルを連続送出 データ形式 テキスト or バイナリのフィールド単位 バイナリのタプルストリームを直接デコード 並列分割 OFFSET / LIMIT 等(大きいほど低速) ctid 物理ブロック範囲(フルスキャン不要) PostgreSQLの COPY は、もともと大量データのインポート/エクスポートのために用意された専用経路で、通常のクエリ実行に伴う1行ごとのオーバーヘッドを回避できます。 PostgresSource はJDBCドライバの CopyManager API(PGCopyInputStream)を使い、COPY (SELECT …) TO STDOUT (FORMAT BINARY) のバイナリ出力ストリームを受け取って、Avroのレコードへ直接デコードします。 中間のテキスト変換を挟まないぶん、CPU負荷とアロケーションを抑えられます。 PostgreSQLの機能を活用したDataflowでの実装 ステップ1: ctidブロック範囲の列挙 PostgreSQLの全タプルには ctid(物理的な行ロケーション = (ブロック番号, ブロック内オフセット))が付与されています。 PostgresSource はテーブルを 物理ブロック範囲 で分割し、各範囲を TID range scan で読みます。 分割の計画には実データのスキャンが一切不要です。 ブロック数は、テーブルの実ディスクサイズをブロックサイズで割って求めます。 SELECT pg_relation_size('table'::regclass) / current_setting('block_size')::bigint pg_relation_size  は実ディスクサイズを返すため、統計情報の  pg_class.relpages  推定値よりも正確で、しかもstat呼び出し1回で済みます。 1ブロックあたりの行密度は推定行数( pg_class.reltuples )から求め、目標行数  splitSize  に対応するブロック幅を機械的に算出します。  [0, blockCount)  をこの幅で割っていくだけなので、 フルスキャンも OFFSET も不要 です。 行密度 = 推定行数 / ブロック数1範囲のブロック幅 = max(1, round( splitSize / 行密度 )) 範囲 = [0, w), [w, 2w), … , [kw, blockCount) ※最後の範囲は上限を開く 最後の範囲を上限なし(オープンエンド)にしておくことで、分割を計画した後に追記された行も読み取れます。各範囲は ctid の半開区間条件に変換されます。 WHERE ctid >= '(0,0)'::tid AND ctid < '(3,0)'::tid この条件も、TiDBの主キー範囲と同様に物理位置の値による範囲比較です。 PostgreSQL 14以降ではこれが TID range scan として処理され、該当ブロックへ直接シークして必要な範囲だけを読みます。 注意点 :  ctid  は物理的な行位置なので、読み取り中にINSERT/UPDATE(別ページへの移動)/VACUUMが起きると、同じ行を二重に読んだり取りこぼしたりする可能性があります。同期中に更新されないテーブルに対して使うか、バッチ読み取りで一般的なスナップショットのズレを許容する前提で利用します。また、TID range scanはPostgreSQL 14以降のサポートで、それ以前のバージョンでは各範囲がシーケンシャルスキャンにフォールバックします。 ステップ2: Rangeの再分配 TiDBと同様、RangeのリストをCreateで撒き、Reshuffle.viaRandomKey()でワーカーへ再分配します。fusion分断と負荷分散の狙いは同じです。 ステップ3: 各ワーカーでのバイナリCOPY並列読み取り 各ワーカーは、担当Rangeのctid条件を組み込んだSELECTを COPY (…) TO STDOUT (FORMAT BINARY) でラップし、バイナリストリームを受け取ります。 COPY (SELECT * FROM table WHERE ctid >= '(0,0)'::tid AND ctid < '(3,0)'::tid)TO STDOUT (FORMAT BINARY) 返ってくるのはPostgreSQLのCOPYバイナリフォーマットです。 PostgresSource はこれを直接パースします。 先頭の固定シグネチャ( PGCOPY\n\377\r\n\0 )とヘッダを読み飛ばし、以降はタプルを1件ずつ読みます。 各タプルは「フィールド数」に続いて、フィールドごとに「長さ( -1 はNULL)+値のバイト列」が並ぶ構造で、終端は番兵値(フィールド数 =  -1 )で示されます。 値のデコードは、PostgreSQLのバイナリ表現をAvroの値へ型ごとに変換します。たとえば次のような処理です。 整数・浮動小数: ビッグエンディアンでそのまま読む numeric : base-10000 のdigit配列から十進数を復元 date  /  timestamp : PostgreSQLエポック(2000-01-01)基準の値をUnixエポック基準へ補正 uuid  /  json  /  jsonb  /  bytea : それぞれの専用処理 これらをドライバのテキスト変換を介さず自前でデコードすることで、転送経路を最短化しています。 なお、IAM認証(Cloud SQL / AlloyDB)にも対応しており、 user 未指定時はワーカーのサービスアカウントをDBユーザとして使い、接続URLに  enableIamAuth=true  を自動付与します。 検証用の12つのカラムを持つ6億件のダミーテーブルデータをAvroファイルとしてGCSに出力するタスクで、6コア並列で8分で処理完了するようになりました。 制約: なぜTiDBSourceのようにワーカー間の一貫性を担保できないのか TiDBSource では tidb_snapshot によって全ワーカーが同一時点を読み、ワーカー間の一貫性を担保していました。一方の PostgresSource では、各 ctid レンジが それぞれ独立したトランザクションで読まれる ため、レンジ間(別接続・別時刻)での一貫性は保証されません。読み取り中にINSERT/UPDATE( ctid が別ページへ移動)/VACUUMが起きると、レンジをまたいで行の重複や欠落が起こりえます。 PostgreSQLにも一見すると同等の仕組みがあります。 pg_dump の並列モードは、 pg_export_snapshot() でスナップショットをエクスポートし、各ワーカーが SET TRANSACTION SNAPSHOT でそれを取り込むことで、ワーカー間の一貫性を担保しています。 PostgresSource で同じことを今回実現できなかった理由は スナップショットの「寿命」の違い にあります。 TiDB( tidb_snapshot ) PostgreSQL( pg_export_snapshot() ) 実体 TSO(論理タイムスタンプ=ただの数値) エクスポート元トランザクションに紐づくスナップショットID 寿命 永続的 (GCされるまで。トランザクション非依存) 一時的 (エクスポート元トランザクションが開いている間だけ有効) 共有方法 数値を渡し、各セッションが独立に SET 元トランザクションが生存中に各セッションが SET TRANSACTION SNAPSHOT で取り込む PostgreSQLのエクスポートされたスナップショットは、 それをエクスポートしたトランザクションが終了するまでしかインポートできません 。 pg_dump の並列モードが成立するのは、単一プロセスのリーダーがダンプ全体の間ずっとエクスポート元トランザクションを開いたまま保持し、自身でワーカーを起動するからです。 PostgresSource が動くCloud Dataflowの実行モデルでは、元トランザクションを 並列読み取りの全期間にわたって保持する場所を確保するのが難しかったのです 。TiDBSourceではランチャーでTSO(数値)がトランザクションと無関係に有効だったためワーカーに配るだけで済みました。 そのため PostgresSource では各レンジ独立読み取りを前提とし、「同期中に更新されないテーブルに対して使う/バッチ読み取りで一般的なスナップショットのズレを許容する」という運用方針にしています。 まとめ: 高スループットを支える設計要素 両モジュールでは、 「テーブルの物理的なデータ配置に沿って分割し、その分割単位を分散ワーカーで並列に読み取る」  という共通する設計思想で実装しました。 以下  SpannerSource (SpannerのPartitionQuery等を活用したモジュール)も加えた各設計要素の比較表です。すでにいずれかのDBに親しんでいる人は別のDBの機能と比較することで関心・理解が深まるかもしれません。 要素 SpannerSource TiDBSource PostgresSource 物理分割の基準 Split境界( PartitionQuery ) TiKV Region境界( TABLESAMPLE REGIONS() ) ctid 物理ブロック範囲( pg_relation_size ) 分割キーの自動抽出 Spannerが自動分割(指定不要) PK /  _tidb_rowid  を自動解決 ctid (全テーブル共通) 分割計画のコスト API呼び出しのみ(フルスキャン不要) 境界キーの列挙のみ(フルスキャン不要) stat呼び出しのみ(フルスキャン・ OFFSET 不要) 範囲の読み取り Partition Tokenを実行 PK値の範囲比較でインデックスをシーク ctid の範囲比較でTID range scan 転送機構 gRPCストリーミング( executeStreamingSql ) ストリーミングResultSet( fetchSize=MIN_VALUE ) バイナリCOPY( COPY ... TO STDOUT BINARY ) 一貫性 BatchReadOnlyTransaction ( TimestampBound ) tidb_snapshot によるMVCCスナップショット バッチ読み取り( ctid スナップショットのズレは許容) 並列化 Partition Tokenを Reshuffle  +  ParDo で分散 Reshuffle  +  ParDo (dumplingのWriter並列に相当) Reshuffle  +  ParDo バージョン保護 Partition Tokenで自動保持 tidb_gc_life_time  の引き上げで代替 (該当なし) フォールバック Partition Query → 通常Query Region → 数値MIN/MAX → 全体 TID range scan → シーケンシャルスキャン(PG14未満) 汎用JDBCに対する優位性は、(1) 分割キーを自動抽出でき、分割計画も安価なので並列度を素直に上げられること、(2) 各ワーカーの転送がネイティブ機構でCPU/メモリ効率が高いこと、(3) DB固有のスナップショット機構で一貫性を担保できること(TiDB) の3点に整理できます。これらはもともとSpannerSourceがSpannerの機能で実現していた特性を、今回 TiDB / PostgreSQL 固有の機能を組み合わせて再現したものになります。 おわりに 多数のマイクロサービスが抱える大量のテーブルをDWHへ同期するという要件に対し、汎用的なJDBC経路ではなく、TiDBとPostgreSQL(AlloyDB)それぞれの  分散ストレージアーキテクチャや物理データ配置を活かした専用Sourceモジュール  をCloud Dataflow上に実装しました。これは、すでにSpannerからの取得で高スループットを実現できていた「分割 → 配布 → 並列スナップショット読み取り」というパターンを、他のDBにも横展開した取り組みと言えます。 今回は各DBが公式ツール( dumpling  / pg_dump )で利用されている高速化のノウハウを参考にさせてもらい、Dataflowの分散実行モデルに取り込みました。「分割キーの自動抽出」「物理配置に沿った安価な分割」「ネイティブなバルク転送」「DB固有のスナップショット」という要素の組み合わせが、大規模テーブルでも高いスループットと一貫性を両立させる助けになりました。 次の記事は cyan さんの「Scaling Myself: How I Run 22 Claude Code Sessions for DS4 Migration」です。引き続きお楽しみください。
p.fontbold{ font-weight: bold; } p.codeboxbefore{ margin-bottom:-70px; } @media screen and (max-width: 575px) { p.codeboxbefore{ margin-bottom:-13.4vw; } } この記事は Merpay & Mercoin Tech Openness Month 2026 の 13 日目の記事です。 こんにちは。Growth Platform Team でメルペイのポイント還元キャンペーン基盤である Santa サービスの開発を担当している @hasegway です。 なお、タイトルに登場する「Otoku Revolution」とは、コード決済を一定回数使うたびに必ず値引き体験が届く新企画のキャンペーン (本記事では「コード決済の回数連動キャンペーン」と呼びます) の社内呼称です。詳しくは本連載 17 日目の @yutaro の記事を楽しみにしていてください。本記事では、長く運用してきた Santa サービスをルールベースの汎用基盤 (以降「Rulebase 基盤」と呼びます) として書き直したリファクタリングの話と、新基盤の最初のキャンペーンとして「コード決済の回数連動キャンペーン」を立ち上げた話を取り上げます。順を追ってお話しする前に、まず Santa という基幹サービスがどのような歴史を経て、どんな負債を抱えるに至ったかについてお話しさせてください。 Santa の歴史 Santa という名前は、初期に 「使った翌日にバッチ処理でポイントを付与する」 サービスだったところから来ています。夜のうちに溜まったイベントを翌朝まとめて配って回る、シンプルな仕組みでした (初期の仕組みについては メルペイのキャンペーンを支えるサンタの秘密 が詳しいです)。 それから何年も経て、今では 1 日数百万イベントを処理し、メルカリ / メルペイのさまざまな利用シーンでポイント付与を行う基幹サービスへと成長しました。この成長の過程で一貫していたのは、 「キャンペーンの種別ごとに専用のイベントパイプラインを実装する」 スタイルで機能を積み重ねてきたことです。それぞれ独自のテーブル、独自のイベントハンドラ、独自の Cap (上限) ロジック、独自のポイント付与フローを持っていました。 2021 年のフィルタリング機能 は「複数の条件を AND/OR で組み合わせる」 発想を Santa に持ち込んだ、汎用化の最初の一歩でした。 2022 年のメルカード常時還元 は、その上に「メルカードステージ別の還元率」「複数月にまたがる Provision (引当金) 管理」 といった精緻なロジックを乗せた大規模なパイプラインで、現在でも Santa 最大のパイプラインです。それでも、キャンペーン本体のコードは CampaignType ごとに別物のままでした。 この構造は当初の要件においては合理的でした。しかしキャンペーン要件の複雑化とともに、コードの再利用性が低い設計による開発速度の低下、バグのキャンペーン別対応による運用負荷の増大といった課題が積み重なっていきました。2025年夏の時点では、Santa エンジニアチーム内でこれら課題への共通理解ができており、次世代キャンペーン構造のラフ設計まで進んでいました。ただし当時はスケジュール上の制約で本格着手を見送り、2025年10月にローンチしたメルカリモバイル向け特典キャンペーンは既存システムを拡張する形で実装しました。 その後、PoCを進め、2025年12月にチーム向けに成果を発表。2026年春にかけて、汎用キャンペーンテーブルにルール評価とアクション実行モジュールを組み合わせた Rulebase 基盤を新規に構築しました。現時点では「コード決済の回数連動キャンペーン」を1件稼働させている段階で、既存のキャンペーン群は引き続き従来の専用パイプラインで動いています。これら既存キャンペーンの段階的な移行は、これから先のフェーズです。 専用パイプラインが抱えた負債 Rulebase 基盤を作る前の Santa は、キャンペーンの種別(CampaignType) ごとに「専用パイプライン」を実装する、というスタイルで成長してきました。代表的なキャンペーン種別と、各種別の代表的なキャンペーン企画は次のとおりです。 キャンペーン種別 主なキャンペーン企画 購入時還元 「買ってお得!dポイント」など メルペイの定額払いの還元 「はじめての定額払いキャンペーン」など メルペイスマートマネーの還元 「初回利用」「カムバックキャンペーン」など メルカード還元 「メルカード常時還元」など これらの種別はそれぞれが、専用のテーブル群、決済や返済を受け取る Pub/Sub の入り口、ビジネスロジックを担う Interactor、ポイント付与履歴テーブルへの書き込みパス、付与上限の計算ロジックを抱えています。また、企画ごとの細かい要件の実現のため、基本のパイプラインの中にさまざまな分岐処理が加えられています。新たにキャンペーン企画を 1 本立てるたびに、これらのさまざまなレイヤーを個別に調査・変更しなければならない、というのが Santa の標準作業でした。 そして、長年運用するなかでこの構造がいくつかの負債を生んでいました。 累積した運用負債 まず、専用パイプラインを実装するスタイルでは、新たなキャンペーン要件のための追加開発が横展開して再利用しづらい問題がありました。また、当初に想定していたキャンペーン要件では考えられなかった新たな要件は、個別実装で対応せざるを得ないこともありました。これらは徐々に開発速度の低下やリグレッションテストの複雑化を招いていきました。特に MercardCampaignType ではこの問題が顕著で、リファクタリングに踏み切る直接的な引き金になりました。 MercardCampaignType が「なんでも置き場」化していった 2022 年にローンチした MercardCampaign パイプラインは、当初「利用ステージ別の還元率」と「清算起点のリアルタイムのポイント付与」を素直に扱うことを想定した、シンプルな常時還元キャンペーン向けの設計でした。 その後、メルカードまわりのキャンペーン要件は急速に広がっていきます。累計購入額連動キャンペーン、メルカリ NFT 決済への対応、メルカード ゴールド、メルカリモバイル契約者向けの特典など、どれも「メルカード保有者・利用者」という共通の文脈はあるものの、設計当時の想定にはなかった要件ばかりです。それでも置き場としては MercardCampaignType が最も適切だったため、これらの新要件は順次 MercardCampaignType の上に実装されていきます。本来シンプルに設計されていた入れ物に想定外の機能が次々と追加され、還元率算出や会計コード指定などの仕組みが本来の用途を超えて流用されるようになっていきました。 ここでは、特に歪みが目に見える形で表面化した3 つの事例 — 累計購入型 (2024 年 9 月)、メルカリ NFT 会計コード差し替え対応 (2025 年 12 月)、メルカリモバイル向け特典キャンペーン (2025 年 10 月) — を順に見ていきます。 累計購入型 と メルカリ NFT 会計コード差し替え 累計購入額連動キャンペーンでは、購入金額が複数のしきい値を順に超えるたびに段階的にポイントが付与されます (例: 累計購入額に応じて最大 P1,500 もらえる)。このキャンペーンが MercardCampaignType に投入され、本来の還元率スキーマに「購入累積トラッカー」型の挙動が乗りました。メルカリ NFT 会計コード差し替え対応では「キャンペーンの各種条件は他と共通にしつつNFT取引のみ会計コードを差し替える」という要件のため、コード内に分岐処理が追加され、キャンペーン設定値が複雑化しました。どちらもメルカード保有者向けの施策ではあるものの、当初想定の責務範囲を超えた要件です。 そして3 例目の、もっとも歪みが大きく出たケースが、メルカリモバイル向け特典キャンペーンです。 メルカリモバイル向け特典キャンペーン (2025 年 10 月) メルカリモバイル向け特典キャンペーンの要件は、 3 種類のメルカードステータス (保有無し / 通常版 / ゴールド) × 4 種類のモバイルデータプラン (4GB / 10GB / 20GB / 40GB) = 12 の独立したキャンペーンパターンが毎月必要、というものでした。 実装上は、本来メルカードステージ別の還元レートを保持するDBフィールドが「データプラン別のレート」の入れ物として流用されました。3 ステータス ×4 データプランの 12 組み合わせを、メルカード用テーブルの行として毎月生成する運用です。 また、お客さまのメルカードステータスやモバイルデータプランは日々変わりうるため変化に合わせて還元レートや月々の上限を計算し直す必要があります。そして、同じお客さまが同月内で別の組み合わせ向けキャンペーンにも二重にマッチして重複付与が起こりうる、というリスクも残ります。後者の重複付与を防ぐために、コード側にはこんな雰囲気のハードコードマップが入りました (簡略化したイメージ)。 // ポイント重複付与防止のため、キャンペーンIDをコードに埋め込み TemporaryCampaignIDMapping = map[string]string{ "202510": "campaign-id-1", "202511": "campaign-id-2", // 毎月追加が必要... そして各所に、データプラン別ステージを判定する if 文が散らばりました。 こうして、毎月 12 パターン分のデータ追加運用が必要な「Temporary」ハードコードマップが本番に居続け(Temporaryとは・・)、モバイル専用ステージを分岐させる if 文がポイント計算・フィルタ評価・API レスポンスに散在し、将来データプランが1つ増えるたびにコード変更とデプロイが必要になり、MercardCampaignType 全体のリグレッションテストも巻き込む、という構造が出来上がっていきます。当初は 「モバイルキャンペーンをアーキテクチャ刷新のきっかけにして抜本対応する」 計画もありましたが、ローンチ期日との両立が難しく、最終的に既存パイプラインを拡張する判断を取りました。当時の制約下では合理的な選択ですが、根本的な構造課題は持ち越し、運用負荷は増えています。 3 事例ともビジネス文脈では筋が通っている一方、「メルカード還元の入れ物」が「メルカード周辺キャンペーン全般の入れ物」として使われ、MercardCampaignType のスキーマと責務範囲が押し広げられてきたのが当時の状態で、そのひずみは無視できない大きさになっていました。 Rulebase 基盤の設計 このリファクタリングそのものは前から検討していたものの、ローンチ責任との両立が難しく、本格着手は半年寝かせています。その間も内部で PoC は進め、本実装で固めた方針は「キャンペーンの挙動を、専用コードから設定データへ」というシンプルなものです。 より具体的には、 「どのようなきっかけで動くか」 (TriggerType)、 「どのような条件でマッチさせるか」 (RuleCondition と、その評価を担う RuleEvaluator)、 「何をするか」 (ActionExecutor) という3 つの軸を、できるだけ atomic な (再利用可能なサイズの) 部品として定義し、その組み合わせで多様なキャンペーン要件に対応する、というのが基本的な発想です。従来のように「メルカードキャンペーン専用のロジックを書き下ろす」のではなく、Trigger / Condition / Action を小さな部品として用意し、キャンペーン定義はその組み合わせとして書く、という発想です。専用パイプライン時代との根本的な違いはここにあります。一度実装した Condition や Action を別の CampaignType から設定値だけで呼び出せたり、動的な条件分岐や繰り返し条件を1 つのキャンペーン定義の中で表現できたりする能力も、ここから生まれます。 全体像 チーム内発表でも、次の対比を使って説明しました。 As-Is (メルカード専用ハンドラの内部に if が積まれている) func (h *MercardCampaignHandler) Execute(event Event) { if user.Stage == StageA { if user.Status == "Active" { if !h.HasReward(user.ID, event.ID) { points = amount * 0.03 h.RewriteRewardHistory(user.ID, event.ID) } } } else if user.Stage == StageB { if user.Status == "Gold" { points = amount * 0.10 if h.HasReward(user.ID, event.ID) { h.RewriteReward(user.ID, event.ID, points) } } } } To-Be (キャンペーン定義は JSON データ、評価エンジンは汎用) { "rule_id": "mobile-std-4gb", "conditions": [ {"type": "user_attribute", "params": {"stage": "Standard", "plan": "4GB"}}, {"type": "period", "params": {"start": "2025-10-01", "end": "2025-10-31"}}, {"type": "payment_attribute", "params": {"transaction_type": "code_payment"}} ], "action": { "type": "ADD_POINTS", "params": {"rate": 0.05, "currency_points": 60, "monthly_cap": 200} } } この JSON を Spanner に永続化したスキーマが次の3 テーブルです。Campaigns 配下に CampaignRules、その配下に RuleConditions を INTERLEAVE IN PARENT で並べる構造になっています。 -- 大枠の宣言: 期間・CampaignType・キャンペーン固有設定 (JSON) Campaigns( CampaignID, CampaignType, StartAt, EndAt, Metadata, ... ) -- 1 キャンペーン内の複数ルール: 何をトリガに、どう集計し、どんなアクションを取るか CampaignRules( CampaignID, RuleID, TriggerType, CalculationType, ActionType, ActionParams (JSON), Priority, Enabled ) INTERLEAVE IN PARENT Campaigns -- 1 ルール内の複数条件: AND/OR グループで合成 RuleConditions( CampaignID, RuleID, ConditionID, ConditionType, ConditionParams (JSON), ConditionGroup ) INTERLEAVE IN PARENT CampaignRules これに対応する評価エンジン (Rule Evaluation Engine) を新設し、以下のような4 層構造の EvaluationData を入力として走らせます。 Layer 内容 Event Data 受信した Pub/Sub イベント Rule & Campaign DB から読んだ CampaignRule + RuleConditions User Data お客さまの属性 (カード種類、利用履歴など) Providers 外部サービスへの DI ハンドル イベントが届いてから付与までの流れは次のとおりです。 Condition/Rule と Action は、新しい種別が必要になったときに対応する実装を追加しておくことで、以降のあらゆるキャンペーン定義から再利用できる仕組みになっています。新規キャンペーンの立ち上げ自体は、すでにカタログに揃っているものの組み合わせで実現できるものであれば、コード変更を伴わずに SQL の INSERT で反映できます。 3 つの軸の中身 ここからは、前述した3 つの軸が Rulebase 基盤の中でそれぞれどう設計されているかを順に見ていきます。 TriggerType — どのようなきっかけで動くか CampaignRules テーブルの TriggerType 列が、各ルールが「どのイベントに反応するか」 を表します。 TriggerType は外部イベントと1対1になるよう定義しており、Pub/Sub から届いたメッセージを内部ドメインのモデルに正規化したうえで、合致する TriggerType を持つルール群を CampaignRules から引き当て、条件マッチングを担う2 段目のハンドラ層に引き渡すところまでが、この軸の責任範囲です。条件評価や副作用はここでは扱わず、後段に切り出しています。 RuleCondition と RuleEvaluator — どのような条件でマッチさせるか CampaignRules に紐づく RuleConditions テーブルには、評価したい条件が1行1 件 並んでいます。各レコードは次の3 つで構成されます。 ConditionType は、その条件がどんな種類の判定をするかを示す分類で、後段でどの ConditionEvaluator にディスパッチするかを決めます。 ConditionParams は、その ConditionType に渡す具体的なパラメータで、種別ごとに必要な引数が違うため、固定スキーマではなく JSON で柔軟に保持しています。 ConditionGroup は、複数の条件を AND と OR で組み合わせるためのグルーピングラベルで、これを使うことで A AND (B OR C) のような複合的な論理式を、フラットな行データで表現できるようにしています。 たとえば「ある期間内で、お客さま属性 B か C のいずれかにマッチしたら成立」 という A AND (B OR C) の条件は、RuleConditions テーブルに次のように 3 行で並びます。 ConditionID ConditionType ConditionGroup グループの意味 A period NULL 単独で AND B user_attribute g1 グループ g1 内で OR C payment_attribute g1 グループ g1 内で OR 評価エンジンの入力は、先に挙げた4 つのレイヤーの情報を1つに束ねた EvaluationData です。ConditionEvaluator 側からは「これさえ読めば判定に必要な値はそろっている」 状態で参照できるようにしてあります。PoC ではここにキャンペーン固有の計算結果も持たせる案を検討していましたが、本実装では「条件評価のフェーズとアクション実行のフェーズで責務を分けるべき」と判断し、 EvaluationData は条件評価に必要な情報だけに絞っています。 評価エンジン本体は、ConditionType ごとの個別判定を担う ConditionEvaluator と、ルール 1 件分の真偽をまとめる RuleEvaluator の 2 段構成です。 下段の ConditionEvaluator は、ConditionType ごとに 1 つずつ実装が用意されており、 EvaluationData を読み取ってその条件 1 件分の真偽を返します。判定は副作用を持たず、外部 API 呼び出しや DB 書き込みは起こりません。 上段の RuleEvaluator は、その上に乗ってルール 1 件分の評価を組み立てます。具体的には、(1) RuleCondition の各行をその ConditionType に応じた ConditionEvaluator に振り分け、(2) 各行が返した真偽を ConditionGroup のセマンティクス (NULL は単独 AND、同じ値どうしは OR、別の値どうしは AND) で AND/OR 合成し、(3) ルール 1 件分の最終的な真偽と、どの条件がどう寄与したかの内訳を返します。返るのは真偽と内訳だけで、計算結果や副作用は含めません。 新しい評価軸が必要になったときは、対応する ConditionEvaluator を実装して ConditionType の enum に登録します。一度カタログに加われば、以降のキャンペーン定義は RuleConditions の 1 行としてその軸を設定値ベースで呼び出せます。 ActionExecutor — 何をするか マッチしたルールに対応する Action を、 ActionType ごとの Executor が実行します。上限計算、ポイント付与ステータスの遷移、ポイント台帳への書き込み、外部へのイベント発行といった副作用は、ここで起こります。Condition / Rule 側は副作用を持たない設計なので、外部に作用するロジックはこの層に集約されます。新しい Action 種別が必要になったときは、対応する Executor を実装して ActionType の enum に登録します。一度カタログに加われば、以降のキャンペーン定義は CampaignRules.ActionType と ActionParams を指定する形で、その Action を汎用的に呼び出せます。 3 軸を支える共通ヘルパー Executor の責務は、Action ごとの副作用そのもの (DB 書き込み、外部 API 呼び出し、PointStatus の遷移など) です。一方で、ポイント計算と Cap (上限) 適用、PointStatus / ProvisionStatus のライフサイクル管理、外部マイクロサービスに渡す冪等キーの生成、設定値テンプレートの展開といった「Action 種別をまたいで毎回必要になる横断的な処理」は、3 軸とは別レイヤーの共通ヘルパーに切り出しています。各 Executor は、自分の Action に応じて必要なヘルパーだけを必要なときに呼び出す形にしてあります。実装したヘルパーはいくつかありますが、代表例を 2 つ挙げます。 PointLifecycleManager (PointStatus / ProvisionStatus のオーケストレーション) ポイント付与にまつわるライフサイクルをまとめて扱うヘルパーです。 お客さまへのポイント付与状態 (PointStatus) と、 会計上の引当状態 (ProvisionStatus) は、それぞれ独立したステートマシンとして表現しています。 上が PointStatus で、 planned で台帳に予定を立て、外部の付与 API が成功すると confirmed 、後処理まで終わると completed に進みます。 planned 直後にキャンセルされた場合は cancelled 、外部呼び出しが失敗した場合は failed に倒れる、というのが主な遷移です。下が ProvisionStatus で、引当が立っていない not_linked から、引当が紐づいた linked を経て、最終的に revoked (引当処理が確定した状態) に遷移するのが主軸です。 旧メルカードパイプラインでは、PointStatus の遷移と ProvisionStatus の遷移がそれぞれポイント付与処理側と引当処理側に分散して実装されていて、両者がどう連動しているかの見通しが悪くなっていました。Rulebase 基盤では、両方のステートマシンを PointLifecycleManager 配下に切り出し、 CompletePoint / ReversePoint / CreateProvision / UpdateProvision / RevokeProvision の各 Action から必要なときに呼び出す形にしてあります。これによって、複数月にまたがる引当が必要なメルカード系のキャンペーンも、引当が不要なシンプルなキャンペーンも、同じ部品の組み合わせでライフサイクルを表現できます。 TemplateExpander (設定値テンプレートの展開) 旧システムには汎用的な文字列テンプレートの仕組みがなく、たとえば会計コードを決済種別ごとに分けて積むような要件が出てくると、会計コードのパターン数だけキャンペーンレコードを別に切るしかありませんでした。本来 1 つのキャンペーンとして扱いたいものをパターン数だけ重複登録する必要があり、運用負荷の原因として積み上がっていた部分です。 これを汎用化するために用意したのが TemplateExpander です。キャンペーン定義のなかに {user_id} / {campaign_id} / {payment_count} といったプレースホルダーを書いておくと、実行時に EvaluationData から取り出した実値に置き換えます。会計コード (たとえば merpay_xxx_campaign-{campaign_id}-{user_status} ) や、ハッシュのシード文字列 ( {user_id}:{campaign_id}:{payment_count} ) などがその利用例です。新しいパターンが必要になったときも、設定マスタ側のテンプレート文字列を差し替えれば、コード変更や追加レコードなしに同一キャンペーンの中で扱えます。 最初の実装事例: コード決済の回数連動キャンペーン ここまでに作った汎用基盤の「入れ物」を使った最初の移行ですが、感覚的には「リスクが低く、運用負荷の高いところ」 から始めたくなります。新しめで蓄積データの少ないものほど移行リスクが下がり、運用負荷が高いほど移行の効果が出やすいからです。当初はメルカリモバイル向けキャンペーンを最初の移行対象に据える予定でした。元々これをアーキテクチャ刷新のきっかけにする案も挙がっていましたし、稼働開始から日が浅く蓄積データが少ない一方で 12 パターンの毎月運用で運用負荷が高い、というまさにスイートスポットの条件に当てはまっていたためです。 ただ、ちょうどそのタイミングで、新規企画として「コード決済の回数連動キャンペーン」 の話が立ち上がります。新規企画であれば、常時稼働している既存パイプラインを止めずに載せ替える、というコストを払わずに、最小構成で「設定値ベースでキャンペーンを組み立てる」 ことの検証ができます。結果として、最初の事例は移行ではなくゼロからの立ち上げに振り直し、本企画のキャンペーンを Rulebase 上で直接立ち上げる形で進めました。既存のメルカリモバイルなどの移行は、先のフェーズに持ち越しています。 この新規企画は、N 回利用するごとに付与が発火するシンプルな仕組みで、お客さまから見ると「使い続けるほど、確実に値引きが返ってくる」体験になります。 これをRulebase上で組むにあたって必要だったのは 累計カウントと周期報酬を扱う ActionType ( COUNT_AND_REWARD )を 1 つだけ追加することだけでした。あとは 既存の ConditionType (期間 / お客さま属性 / 決済属性) の組み合わせで実装できた点です。Rulebase 上で「ルール評価とアクション実行を最大限再利用し、なるべく設定値だけで組み立てる」形を、最初に検証できたケースになりました。 仕組み自体はシンプルです。お客さまのエントリ状況を「お客さま属性条件」で見て、対象決済を「決済属性条件」で絞り込んだうえで、 COUNT_AND_REWARD Executor がカウンタをインクリメントします。カウンタが規定回数に達するとポイントを付与してカウンタをリセットし、これをキャンペーン期間中ずっと繰り返す、という流れです。データとしては、Campaigns 1 行、CampaignRules 1 行、RuleConditions 数行、というシンプルな構造で表現できます (簡略化したイメージ)。 // Campaigns { "campaign_id": "...", "campaign_type": 9, // InfinitePayment "start_at": "2026-04-01", "end_at": "2026-09-30", "metadata": { ... } } // CampaignRules (1 件) { "trigger_type": "PaymentCharge", "action_type": "COUNT_AND_REWARD", "action_params": { "trigger_count": 3, "reward_currency_points": 100 } } // RuleConditions (3 件: 期間 / エントリ状況 / 決済種別) { "condition_type": "period", "params": { ... } } { "condition_type": "user_attribute", "params": { "attribute": "entry_status", "promotion_key": "..." } } { "condition_type": "payment_attribute", "params": { "attribute": "transaction_type", "value": "code_payment" } } 実装上の新規追加は COUNT_AND_REWARD Executor と、カウンタを保持する 2 つのテーブル (現在のカウントとログ) くらいです。他は基盤の汎用部品をそのまま組み合わせて完結しました。 補足として、この回数連動キャンペーンの意義は「単純なキャンペーン派生がやりやすくなった」ことではありません。SQL INSERT で似たキャンペーンを増やすこと自体は、従来の専用パイプライン時代でもそれなりにできていました。Rulebase 基盤で新しく可能になったのは、CampaignType をまたいだ Rule・Action の設定ベース再利用です。一度実装した Condition や Action は、別の CampaignType のキャンペーンからも設定値の組み合わせで呼び出せます。回数連動キャンペーンで使った COUNT_AND_REWARD も、他のキャンペーンが「累積回数で発火する」要件を持てば、コードを書き足さずに設定だけで使い回せます。 同じ仕組みは、動的な条件分岐を 1 つのキャンペーン設定で表現することにも転用できます。たとえばメルカリモバイル向けキャンペーンでは「3 ステータス × 4 データプラン」を 12 個の独立キャンペーン定義として毎月準備していましたが、Rulebase なら属性条件・繰り返し条件・上限条件を組み合わせて 1 つのキャンペーン設定の中で表現できます。「N 回ごとに発火」のような周期的な振る舞いも、 COUNT_AND_REWARD を Executor 側に持つことで汎用パーツとして扱えます。今後の新企画では、既存のルール・アクションの組み合わせで表現できる範囲が広がっているかぎり、ゼロからの実装ではなく「設定値だけで作って試す」ところから入れる、というのが新基盤の最大のメリットです。 余談: ランダムなのに、何度引いても同じ金額 仕様としては 「規定回数に到達したら、複数の候補金額から重み付きランダムで 1 つを引き当てる」 という要件があります。素直に math/rand で抽選してしまうと、Pub/Sub の at-least-once 配信下では同じ PaymentCharge が再配信されたときに 1 回目と 2 回目で別の金額が選ばれてしまうことがあります。PointID ベースの冪等性チェックは通っているのに、2 回目の試行から見える金額が初回付与額とずれる、というケースが起きうる構造です。 そこで抽選は乱数ではなく決定論的ハッシュで行いました。 user_id ・キャンペーン識別子・ payment_count を組み合わせた文字列をシードに、SHA-256 でハッシュ化したものから重み付きで候補を 1 つ選びます。アルゴリズムの概略は次のとおりです (簡略化したイメージ)。 // seed の例: "12345:campaign-abc:3" (user_id : campaign_id : payment_count) seed, _ := expander.Expand(params.DeterministicReward.SeedFormat) h := sha256.Sum256([]byte(seed)) v := binary.BigEndian.Uint64(h[0:8]) // 先頭 8 バイトを uint64 として取り出す var totalWeight uint64 for _, rv := range variations { totalWeight += rv.Weight } target := v % totalWeight // 重みの合計で割った余り (これが候補選択用の値) var cumulative uint64 for i, rv := range variations { cumulative += rv.Weight if target < cumulative { return rv, i // 累積重みで当たった候補を選択 } } お客さまから見た「使うたびに違う金額が返ってくる」体験はそのままに、同じ (user_id, campaign_id, payment_count) の組み合わせに対しては何度 retry が走っても、別プロセスから読まれても、同じ金額が一意に確定します。 そして、シードさえ揃えば抽選結果は確定値なので、Pub/Sub のイベント処理完了を待たずに「今回付与される金額」を計算できます。決済成功直後の API レスポンスやフロントエンド側で「今回は ○ ポイントです」と確定表示を返すことができ、後段で UserPoints が書かれたあとに金額が変わって見える、といった不整合の心配もありません。at-least-once と weighted random を両立させるための仕組みが、お客さまへの早い表示にもそのまま使える形になっています。 QA 環境の改善に助けられた話 今回の QAではいくつかの環境パターンを切り替えて実施する必要があり、工数ボリュームに不安を感じていました。ちょうど良いタイミングで今まで Santa サービスでは実現できていなかった「イベント駆動部分専用の QA 複製環境」をチームメンバーが作りきってくれたことで、並行して QA を実施できるようになり、大変助かりました。その詳細は本連載 8 日目の @mikupo の記事 で詳しく解説されています。 おわりに 今回のリファクタリングでは、「キャンペーン定義をコードからデータへ」 という方針のもと、3 テーブル + 評価エンジンに再構成しました。ローンチ責任との両立で半年寝かせていたリファクタリングの機会を諦めずに掴んで「入れ物」を作り切って本番でキャンペーンをひとつ稼働させたことで、システムに今後の新要件についての受け皿を作ることができました。 ただし、回数連動キャンペーン自体は本記事執筆時点ではローンチしたばかりで、まだ派生キャンペーンの実例はありません。設定だけで派生が立ち上がる世界の本格検証は、これからの新企画を通じて行っていくフェーズです。加えて、メルカード常時還元などの既存のパイプラインもまだ稼働しており、Temporary マップも本番に残ったままです。常時稼働するキャンペーンシステムでは、入れ物を作る難しさよりも、止めずに載せ替えるタイミングと手順の設計こそが、ここから先の本題になります。 今回の記事は以上になります。なにか参考になることがあれば幸いです。 次の記事は orfeon さんの「TiDB / AlloyDB の大規模テーブルを高速にBigQueryni同期するための工夫」です。引き続きお楽しみください。
2025 年 3 月に Amazon OpenSearch Service による検索ワークショップ(日本語版)のご紹介 という記事を公開し、OpenSearch の基本概念から AI を活用した検索までを学べる日本語ワークショップをご案内しました。 このたび、2 つの日本語版ワークショップが仲間入りいたしましたので、ご紹介いたします。 EC サイト検索ワークショップ :架空の EC サイトを題材に、検索機能を全文検索からセマンティック検索、マルチモーダル検索、エージェント検索へと段階的に育てていくワークショップです。また、ユーザーの行動ログを使った品質計測、機械学習による最適化を体験いただける実験的なラボも付属しています。 OpenSearch Observability Stack ワークショップ :OpenSearch を Observability のバックエンドとして使い、マイクロサービスの APM・ログ・メトリクスを横断しながら、Agentic AI も活用して障害の原因を調査するワークショップです。Agent Trace といった新しい OpenSearch の Observability 関連機能もお試しいただけます。 以降、2 つのワークショップの概要について説明してまいります。 EC サイト検索ワークショップ 架空の企業「AnyCompany」が運営する EC サイトを題材に、商品検索を少しずつ改善していくシナリオ形式のワークショップです。OpenSearch の全文検索の導入からスタートし、ファセットやセマンティック検索、エージェント検索に至るまで検索機能を段階的に進化させていきます。 FastAPI + htmx による EC サイト風の検索 UI が付属しており、機能追加によって検索結果がどのように変化していくかを実際に比較・確認しながら進めることが可能です。 本ラボは Amazon SageMaker Studio の JupyterLab 上でノートブックを順次実行しながら進めていきます。商品のマスター情報は Amazon Aurora PostgreSQL に、検索インデックスは Amazon OpenSearch Service に格納されています。各種 ML モデルへのアクセスは、OpenSearch の connector を経由して行われます。 主要なリソースは以下のとおりです。 リソース 用途 Amazon OpenSearch Service 検索エンジン Amazon Aurora Serverless v2 商品マスターデータ(PostgreSQL) Amazon SageMaker Studio ノートブック実行環境(JupyterLab) Amazon SageMaker AI Embedding モデルのホスティング Amazon Bedrock LLM の利用(Agentic Query) ワークショップの構成 本ワークショップは 2 つのトラックが存在します。トラック A のみでもお楽しみいただけますが、トラック B まで実行することでより検索に対する理解を深めることができます。 トラック A: 検索機能の改善 トラック A は機能の改善にフォーカスしています。4 つのラボで構成されています。 全文検索の導入: OpenSearch の全文検索を導入し、形態素解析器 Sudachi による日本語トークナイズ、複数フィールドをまたいで検索する multi_match 、関連度スコアによるランキングを実装します。検索エンジンがなぜ高速に全文検索できるのか、その心臓部にあたる転置インデックスの仕組みにも触れます。 セマンティック検索の導入: テキストの「意味」で探すセマンティック検索を導入します。OpenSearch 3.1 で追加された semantic フィールドを使えば、インジェストパイプラインを書かずにベクトル検索を実現できます。Embedding モデルには日本語特化の ruri-v3-310m(Apache 2.0)を SageMaker エンドポイントで利用し、BM25 とセマンティック検索を組み合わせたハイブリッド検索も体験します。 マルチモーダル検索の導入: テキストだけでなく、画像で商品を探す体験を追加します。「この写真に似た商品を探して」といったユースケースに応えるため、画像とテキストを同じベクトル空間にマッピングする CLIP(clip-japanese-base-v2)を使います。Ingest Pipeline の text_image_embedding プロセッサが投入時に画像を自動でベクトル化するので、テキストから画像を探す検索と、画像から似た画像を探す検索の両方を、1 つのベクトルフィールドだけで実現できます。 エージェント検索の導入: OpenSearch 3.2 で追加された Agentic Query を使い、自然言語で商品を検索できるようにします。ユーザーの質問を LLM が QueryDSL に変換し、思考の過程も含めた結果を出力します。Amazon Bedrock 上の Anthropic Claude モデルと組み合わせて、検索バーに軽量な Flow Agent(Claude Haiku 4.5)を、チャット UI にはメモリ機能をサポートした Conversational Agent(Claude Sonnet 4.6)を搭載し、その違いを比較します。 トラック B: 計測と改善 トラック B は計測と精度改善にフォーカスしています。3 つのラボで構成されています。 ユーザー行動ログの蓄積と分析: UBI(User Behavior Insights)は、検索クエリ・クリック・カート追加・購入といったユーザー行動を、標準スキーマで記録する仕組みです。本ワークショップではサンプルの行動データを使った分析を実際に体験します。 検索品質評価: UBI データを使って、検索品質を数値で評価します。Search Relevance Workbench(SRW)と呼ばれるツールを活用し、テキスト検索やハイブリッド検索等の各種検索手法がどのようなクエリで有効であるかを評価指標に基づいて分析します。人間による分析に加えて、MCP + Strands Agent を活用したエージェントによる分析も応用パートとして提供しています。 Learning to Rank: UBI データと SRW の判定セットを学習データに使い、XGBoost(LambdaMART)でモデルを学習します。そして、ハイブリッド検索で絞り込んだ上位を LTR で並べ替える 2 段階ランキングを構築します。特徴量は BM25 派生のものから始め、人気度や在庫といったビジネス特徴量、さらにユーザーのペルソナを段階的に組み込んでいきます。 OpenSearch Observability Stack ワークショップ OpenSearch Observability Stack は、OpenSearch と Prometheus をバックエンドとした、Observability プラットフォームです。Piped Processing Language (PPL) によるトレース・ログ・メトリクスの分析、ダッシュボード上での可視化、Agent Trace、アラートや異常検知といった機能を OpenSearch UI と呼ばれるダッシュボードを通じて利用することができます。Ask AI と呼ばれる機能を活用した AI によるインシデント分析も可能です。 このワークショップでは、16 のマイクロサービスで構成されている EC サイトで発生した問題を、OpenSearch Dashboards の Observability 機能と Agentic AI(AI アシスタント)を用いて調査を行いながら各機能についての理解を深めていく内容となっています。 OpenTelemetry Demo (EC サイトを模した 16 のマイクロサービス)を Amazon EKS 上にデプロイし、OpenTelemetry Collector が集めたトレース・ログ・メトリクスを Amazon OpenSearch Ingestion(OSIS)と Amazon Managed Service for Prometheus(AMP)に送り込む構成となっています。 OSS 版ではセルフホストが必要な部分を AWS マネージドサービスに置き換えているため、参加者はインフラの構築ではなく、Observability の体験そのものに集中できます。 ワークショップの流れ ラボは基本的に任意の個所から開始することが可能ですが、順番に進めることでよりスムーズに理解を深めることができます。 ラボ 1. Application Performance Monitoring による調査 Application Map でサービス間の依存関係とエラー率を一目で把握し、RED メトリクス(Rate, Errors, Duration)から問題のあるサービスを判定、関連するトレースやログを確認していきます。 ラボ 2. Ask AI によるサービス障害調査 OpenSearch Dashboards に組み込まれた Ask AI に、「直近で一番エラーの多いサービスは?」と自然言語で尋ねるところから始めます。Investigation Agent が、複数のデータソースを自律的に横断して根本原因の仮説を立て、その調査過程をノートブックとしてまとめる様子を見ることができます。 ラボ 3. Discover による分析 ログ・トレース・メトリクスを、PPL(Piped Processing Language)や PromQL を活用して横断的に掘り下げて分析します。 ラボ 4. Dashboard に集約する Discover で作成した Visualization を Dashboard に集約し、チームで共有できる運用ダッシュボードを構築します。 ラボ 5. 異常検知・アラート・Forecast OpenSearch の組み込み ML を使い、Anomaly Detection(Random Cut Forest)による異常検知、Alerting による通知、Forecasting による将来の予測を設定します。 その他のラボ 本ワークショップでは、他にも応用的なラボをいくつか提供しています。 例えば、Agent Trace と呼ばれる機能を活用して、Amazon Bedrock 上の Anthropic Claude を使った商品レコメンドエージェントが生成する gen_ai.* スパンを分析し、Agent の思考の流れ(Agent Graph)やトークン使用量の可視化などを体験することができます。 ワークショップの始め方 Workshop Studio にアクセスし、トップページから取り組みたいワークショップを選んでください。 各ワークショップには CloudFormation テンプレートが付属しており、ご自身の環境に必要な AWS リソースを簡単にデプロイすることが可能です。ご自身の AWS アカウントでも、AWS イベント会場で Workshop Studio が払い出す一時アカウントでも実施できます。 リソース展開後は、SageMaker Studio(JupyterLab)もしくは OpenSearch Dashboards にアクセスし、ノートブックや Workshop Studio サイトの手順に沿って学習を進めていきます。 なお、ご自身の AWS アカウントで実施する場合は、OpenSearch・Aurora・SageMaker・EKS などのリソース利用に応じた料金が発生します。ワークショップを終えたら、クリーンアップ手順に従ってリソースを削除してください。 まとめ 今回追加された二つのワークショップはいずれもユースケースに即した内容となっており、一連のラボを通して OpenSearch に関する最新の知見や活用イメージの把握に繋げることができます。 EC サイト検索ワークショップでは、検索機能を段階的に育て、その効果を計測し、機械学習で改善するまでの一連のサイクルを体験できます。OpenSearch Observability Stack ワークショップでは、検索エンジンとは違う一面、Observability のバックエンドとしての OpenSearch と、Agentic AI を取り入れた分析を体験することができます。 検索から Observability、そして AI Agent の活用まで、ハンズオンを通して OpenSearch の可能性に是非触れてみてください。 関連リンク Amazon OpenSearch Service Amazon OpenSearch Service Workshops [Japanese] 前回のブログ:Amazon OpenSearch Service による検索ワークショップ(日本語版)のご紹介 OpenSearch Observability Stack OpenTelemetry Demo ソリューションアーキテクト 榎本 貴之 (X: @tkykenmt )

動画

書籍