TECH PLAY

Go

イベント

該当するコンテンツが見つかりませんでした

マガジン

技術ブログ

目次 はじめに 背景と課題 方式の検討 全体アーキテクチャ 認証・認可フロー クライアントが認可サーバーを発見するまで トークンの取得 ゲートウェイでの JWT 検証 ゲートウェイの実装ポイント 設定ファイルによるバックエンド管理 検証済みユーザー情報の伝搬となりすまし防止 起動時の OIDC Discovery による fail-fast モック認可サーバーによるローカル開発 インフラ構成 社内での活用事例 今後の課題 おわりに はじめに こんにちは。 開発本部開発3部トモニテ開発部所属の庄司( @ktanonymous )です。 エブリーの開発組織では、日常業務から離れて新しい技術やアイデアに挑戦する「挑戦week」という取り組みを定期的に開催しています。 先日行われた挑戦weekの中で、私たちのチームは全社共通で利用できるリモート MCP ( Model Context Protocol ) サーバー向けの認証・認可ゲートウェイを設計・実装しました。 本記事では、その全体アーキテクチャや認証・認可フロー、実装のポイントなどを紹介したいと思います。 ※ 挑戦weekの詳細については過去の記事で紹介していますので、興味のある方は以下をご覧ください。 tech.every.tv 背景と課題 弊社では非エンジニア職にも Claude が配布され、職種を問わず AI 活用が盛んになってきています。 業務の中で Claude を利用するにあたり、社内情報の取得のために社内 API へ接続したいという需要が高まってきていると感じました。 今後、その手段として各チームがリモート MCP サーバーを立てるケースが増えていくと考えられます。 リモート MCP サーバーはインターネット経由でアクセスされるため、社外の人間が利用できないように認証・認可を考慮する必要がありました。 一方で、MCP サーバーを実装するたびに各チームが認証・認可を設計・実装するのはコスト面でも効率面でも避けたいものです。 そこで、共通で利用できる認証・認可基盤を作り、各チームの MCP サーバーはそれぞれのツールの実装に専念できるようにすることを目指しました。 今回検討した要件は以下の通りです。 Claude (Web / Desktop / Code) などの各種 MCP クライアントから、社内のリモート MCP サーバーのツールを利用できる 弊社の Google Workspace アカウントでログイン済みのユーザーのみがツールを利用できる 各チームが新しく MCP サーバーを追加するとき、認証・認可の実装を不要にする 方式の検討 認証・認可の共通化にあたり、大きく分けて以下 3 つのアプローチを検討しました。 ライブラリ方式: 認証・認可処理を共通ライブラリとして実装し、各 MCP サーバーに組み込む サーバー方式: 認可サーバーを立て、各 MCP サーバーがトークンを問い合わせることで検証する ゲートウェイ方式: ゲートウェイがリクエストを一括で受けて認証・認可を行い、検証済みのリクエストだけを後段の MCP サーバーへ転送する 3 つの内、ライブラリ方式は MCP サーバーの実装言語ごとにライブラリを用意する必要があり、 サーバー方式は、責務こそ分離できるものの、各 MCP サーバー側に認可サーバーへトークンを問い合わせる実装が必要になります。 一方、ゲートウェイ方式であれば、認証・認可をゲートウェイに一元化でき、後段の MCP サーバーは実装言語を問わず認証・認可も意識せずに済みます。 近年の MCP ゲートウェイ製品の設計とも方向性が近いことから、今回はゲートウェイ方式を採用しました。 なお、 mcp-context-forge や agentgateway といった既存の OSS MCP ゲートウェイも調査しましたが、 今回の要件に対して機能・ボリュームが大きすぎたため採用を見送り、必要最小限のゲートウェイを実装することにしました。 全体アーキテクチャ 全体のアーキテクチャは以下の通りです。 アーキテクチャ概要 今回実装したゲートウェイは大きく 4 つの要素から構成されています。 MCP クライアント: Claude Code / Claude Desktop など。OAuth 2.0 のパブリッククライアントの立ち位置 ゲートウェイ: ECS Fargate 上で稼働する Go (Echo) 製のリバースプロキシ。JWT の検証と後段 MCP サーバーへのルーティングを担う。クライアントから直接見える唯一の MCP サーバー。 Amazon Cognito: 認可サーバー兼 IdP。Google Workspace アカウントへのフェデレーションを行い、アクセストークン (JWT) を発行する MCP サーバー群: 各チームが実装するリモート MCP サーバー。private subnet に配置し、ゲートウェイ経由でのみアクセス可能であり、クライアントから直接的には見えていない 意識した点として、JWT を検証するのはゲートウェイだけという点があります。 後段の MCP サーバーは JWT 検証を実装せず、ゲートウェイが注入する検証済みのユーザー情報 (後述の X-Auth-* ヘッダー) を信頼します。 その前提を成立させるため、MCP サーバーはインターネットに直接公開せず、ゲートウェイからのみ到達できるネットワーク構成としました。 認証・認可フロー MCP の認可は MCP Authorization 仕様 で定義されており、 OAuth 2.1 をベースに、PRM (RFC 9728) などの仕様を組み合わせて構成されています。 今回のゲートウェイもこの仕様に沿って実装しています。 具体的には以下のようなフローとなっています。 認証・認可フロー クライアントが認可サーバーを発見するまで MCP クライアントには MCP サーバーの接続先を登録するため、認可サーバーがどこにあるかを予め知ることはできません。 クライアントがトークンなしでアクセスすると、ゲートウェイは 401 Unauthorized を返し、 WWW-Authenticate ヘッダーの resource_metadata パラメータで Protected Resource Metadata (PRM) の URL を通知します。 HTTP/1.1 401 Unauthorized WWW-Authenticate: Bearer resource_metadata="https://mcp-gateway.example.com/.well-known/oauth-protected-resource" PRM は、保護されたリソース (今回はゲートウェイ) が「どの認可サーバーに保護されているか」「どのスコープをサポートするか」といった自身のメタデータを公開するための仕様です。 クライアントはこのメタデータを参照することで、トークンの取得先を機械的に発見できます。 クライアントがこの URL にアクセスすると、PRM の仕様で定義された以下のような JSON が返ります。 { " resource ": " https://mcp-gateway.example.com/ ", " authorization_servers ": [ " https://cognito-idp.ap-northeast-1.amazonaws.com/<user-pool-id> " ] , " scopes_supported ": [ " openid ", " email ", " profile " ] , " bearer_methods_supported ": [ " header " ] } クライアントは authorization_servers から認可サーバー (Cognito) を発見し、 さらに Cognito の Authorization Server Metadata ( /.well-known/openid-configuration ) を取得して、 認可エンドポイントやトークンエンドポイントを把握します。 Authorization Server Metadata は、認可サーバーが「どこで認可リクエストやトークン発行を受け付けるか」「どの機能をサポートするか」といった自身の設定情報を公開するための仕様です。 Cognito の場合、以下のような JSON が返ります (主要なフィールドのみ抜粋)。 { " issuer ": " https://cognito-idp.ap-northeast-1.amazonaws.com/<user-pool-id> ", " authorization_endpoint ": " https://<domain>.auth.ap-northeast-1.amazoncognito.com/oauth2/authorize ", " token_endpoint ": " https://<domain>.auth.ap-northeast-1.amazoncognito.com/oauth2/token ", " jwks_uri ": " https://cognito-idp.ap-northeast-1.amazonaws.com/<user-pool-id>/.well-known/jwks.json ", " scopes_supported ": [ " openid ", " email ", " phone ", " profile " ] } この 2 段階のメタデータ取得により、クライアント側に認可サーバーの情報を事前設定することなく、OAuth フローを開始することができます。 トークンの取得 認可サーバーの発見後は、通常の OAuth 2.0 Authorization Code フロー (PKCE 付き) です。 クライアントがブラウザを開いて Cognito の Hosted UI に遷移し、Cognito は Google へフェデレーションします。 ユーザーが自社の Google Workspace アカウントでログインすると、クライアントはアクセストークン (JWT) を取得します。 Cognito 側の設定のポイントは以下の通りです。 パブリッククライアント + PKCE 必須: Claude などの MCP クライアントは利用者の手元で動くため、クライアントシークレットを保持できません。そこでパブリッククライアントとして登録し、PKCE を利用するようにします。 アプリクライアントの事前登録: Cognito では接続元のアプリケーションを「アプリクライアント」として登録します。MCP クライアントごとにユーザープールを登録し、対応するコールバック URL (認可コードの返却先) を設定することで事前に利用するクライアントを登録します。 なお、MCP Authorization 仕様では、クライアントの登録方法として事前登録のほかに、 Client ID Metadata Documents (URL を client_id として扱い、認可サーバーがその URL からクライアント情報を取得する方式) や Dynamic Client Registration (RFC 7591) による動的登録も定義されています。 今回は、Cognito がこれらに対応していないことと、社内利用ではクライアントの種類が限られる (基本的に Claude 系のみ) ことから、仕様でも正規の選択肢とされている事前登録制を採用しました。 また、Cognito のアクセストークンには標準では email クレームが含まれないため、 Pre Token Generation Lambda トリガー を利用して、トークン生成時に email クレームを注入しています。 これにより、後段の MCP サーバーが「誰からのリクエストか」をメールアドレスで判定できるようになります。 ゲートウェイでの JWT 検証 ゲートウェイは、リクエストごとに JWT を検証します。 署名検証には github.com/coreos/go-oidc (v3.18.0) を利用し、 Cognito の JWKS (JSON Web Key Set) は初回取得後にキャッシュされます。 検証項目は以下の通りで、署名・有効期限といった基本的な検証に加えて、クレームベースのチェックを重ねています。 検証項目 内容 失敗時 署名 / iss / exp JWKS による署名検証、発行者・有効期限の確認 401 token_use "access" であること (ID トークンの誤用防止) 401 client_id 事前登録したアプリクライアントの許可リストに含まれること 403 email ドメイン エブリードメインであること 403 scope 必須スコープを満たすこと (設定時のみ) 403 token_use の検証は Cognito 固有のポイントです。 Cognito は ID トークン用とアクセストークン用にそれぞれ別の署名鍵を持ちますが 1 、両方の公開鍵が同一の JWKS で公開されます。 そのため、JWKS 内のいずれかの鍵で署名が検証できることだけを条件にすると、ID トークンも検証を通過してしまいます。 クライアントが誤って ID トークンを Authorization ヘッダーに載せてきた場合に備えて、 token_use クレームが "access" であることを確認しています。 なお、検証項目に aud (audience) クレームは含まれていません。 MCP Authorization 仕様ではリソースサーバーによるトークンの audience 検証が求められていますが、 Cognito のユーザープールが発行するアクセストークンには aud クレームが含まれず、代わりに client_id クレームが含まれます。 そのため、今回の実装では client_id の許可リスト検証によって、トークンが事前登録済みのクライアントに発行されたものであることを確認する形をとっています。 ゲートウェイの実装ポイント ゲートウェイ本体は Go (1.26.3) + Echo (v4.15.2) で実装しました。 ここでは設計上のポイントについて触れます。 設定ファイルによるバックエンド管理 ゲートウェイがバイパスする MCP サーバー (バックエンド) は YAML で宣言的に管理しています。 backends : - name : server-a url : http://server-a.internal:8081 - name : server-b url : http://server-b.internal:8082 ゲートウェイは起動時にこのファイルを読み込み、 /<name>/mcp というパスを各バックエンドの /mcp にマッピングします。 たとえば POST /server-a/mcp へのリクエストは http://server-a.internal:8081/mcp に転送されます。 プロキシ部分は Go 標準ライブラリのリバースプロキシをベースに、以下のように実装しています。 標準実装は転送先のホストを書き換えるだけでパスはそのまま転送するため、転送直前に呼ばれるリクエストの書き換え処理を拡張して、パスプレフィックスの除去を加えています。 func newBackendProxy(targetURL, stripPrefix string ) (*httputil.ReverseProxy, error ) { u, err := url.Parse(targetURL) if err != nil { return nil , fmt.Errorf( "parse %q: %w" , targetURL, err) } rp := httputil.NewSingleHostReverseProxy(u) originalDirector := rp.Director rp.Director = func (req *http.Request) { // 標準の書き換え処理 (転送先を u に向ける) を実行した上で、 // ゲートウェイ側のパスプレフィックスを除去 (/server-a/mcp → /mcp) originalDirector(req) req.Host = u.Host if stripPrefix != "" { req.URL.Path = strings.TrimPrefix(req.URL.Path, stripPrefix) if req.URL.Path == "" { req.URL.Path = "/" } } } // バックエンドに到達できない場合は 502 を返す rp.ErrorHandler = func (w http.ResponseWriter, r *http.Request, err error ) { log.Printf( "[proxy] upstream error %s %s: %v" , r.Method, r.URL.Path, err) http.Error(w, "bad gateway" , http.StatusBadGateway) } return rp, nil } 新しい MCP サーバーを追加したいチームは、サーバーをデプロイしてこの YAML に 1 エントリ追記するだけで、 認証・認可付きのリモート MCP サーバーを公開できます。 検証済みユーザー情報の伝搬となりすまし防止 ゲートウェイは JWT の検証後、 Authorization ヘッダーを除去し、検証済みのクレームを X-Auth-* ヘッダーとしてバックエンドへのリクエストに注入します。 X-Auth-Sub : ユーザー識別子 (sub クレーム) X-Auth-Email : メールアドレス (Pre Token Generation Lambda で注入した email クレーム) X-Auth-Client-Id : OAuth クライアント ID X-Auth-Scope : 許可されたスコープ一覧 このとき重要なのが、クライアントから送られてきた X-Auth-* ヘッダーを必ず削除してから再注入することです。 // 受信した Authorization / X-Auth-* を全て削除(なりすまし防止) stripIncomingAuthHeaders(req.Header) // 検証済みクレームから X-Auth-* を再注入 req.Header.Set( "X-Auth-Sub" , claims.Sub) req.Header.Set( "X-Auth-Email" , claims.Email) req.Header.Set( "X-Auth-Client-Id" , claims.ClientID) req.Header.Set( "X-Auth-Scope" , strings.Join(claims.Scopes, " " )) これにより、クライアントが偽の X-Auth-Sub を付けてリクエストすることで他人になりすますのを防ぎます。 ゲートウェイがクレームヘッダーの削除と再注入を強制するため、バックエンド側は常にゲートウェイで検証済みの X-Auth-* ヘッダーの利用を保証できます。 加えて、バックエンドにはゲートウェイ経由でしか到達できないようにネットワークを構成しています (後述)。 X-Auth-* を信頼できるのは「ゲートウェイを必ず通る」ことが前提なので、アプリケーション実装とネットワーク構成をセットで設計する必要があります。 起動時の OIDC Discovery による fail-fast ゲートウェイは起動時に Cognito へ OIDC Discovery ( /.well-known/openid-configuration の取得) を行います。 issuer の設定ミスなどがあればこの時点で起動エラーになるため、リクエストを受けてから認証エラーが多発する、という事態を防げます。 モック認可サーバーによるローカル開発 ローカル開発用に、OIDC Discovery・JWKS・トークン発行だけを備えた最小限のモック認可サーバーを用意しました。 ゲートウェイから見ると issuer の URL が違うだけなので、実際の Cognito と同じコードパスで JWT 検証まで通しでテストできます。 docker compose でゲートウェイ・サンプルバックエンド・モック認可サーバーを一括起動できるようにしており、AWS 環境なしで認証フロー全体を確認できます。 インフラ構成 インフラは Terraform で管理しています。要点は以下の通りです。 ECS Fargate: ゲートウェイは private subnet に配置し、外部への通信は NAT Gateway 経由 ALB: TLS を終端し、ゲートウェイへ転送。セキュリティグループでゲートウェイへの入力は ALB からのみに制限 Cognito User Pool: Google フェデレーション、アプリクライアント、Resource Server、Pre Token Generation Lambda を Terraform で定義 デプロイ: GitHub Actions から OIDC でロールを引き受けて ECR push と ECS デプロイを実行 MCP クライアント (Claude など) は固定 IP を持たないため、ALB は HTTPS (443) を全公開とし、アクセス制御は JWT 検証と WAF に任せます。 バックエンドの MCP サーバーは private subnet 内でゲートウェイからのみ到達できるようにし、ゲートウェイを経由しない場合はネットワーク的に到達不可能にしています。 社内での活用事例 実際に社内で Redash を操作するリモート MCP サーバーをゲートウェイを利用して社内向けにリリースされました。 Claude とのチャットだけで、クエリの実行やダッシュボードの操作といった Redash 上のほとんどの操作ができます。 ローカル版の Redash MCP サーバーについて、以前の挑戦weekの記事で紹介しています。 tech.every.tv Redash MCP サーバーをリモート化し、認証・認可をゲートウェイに集約することで、個人ごとに API Key などの配布や登録が不要になり、 ビジネスサイドのメンバーであっても、Redash アカウントを持っていれば Google アカウントにログインするだけで API 経由で Redash を操作できるようになりました。 Redash MCP を社内に公開しました 今後の課題 短期間での構築だったため、以下のような課題が残っています。 きめ細かなポリシー管理: 現状は「自社ドメインの社員であること」の確認までで、厳密に権限管理をする場合には、所属チームなどの属性に応じて利用できる MCP サーバーやリソースを制限する仕組みが必要になります。 VPC 間の接続: 各チームの MCP サーバーは別の VPC や AWS アカウントで稼働するケースもあります。そのため、今後さらに MCP サーバーの利用を展開していくためには、VPC Peering などによる VPC 間接続を検討する必要があります。 おわりに 本記事では、全社共通のリモート MCP サーバー向け認証・認可ゲートウェイの設計と実装を紹介しました。 ゲートウェイ方式を採用したことで、認証・認可の実装をゲートウェイに一元化でき、 各チームは MCP サーバーのツール実装に専念して、設定ファイルへの追記だけで認証付きのリモート MCP サーバーを公開できるようになりました。 MCP の認可まわりは仕様の整備が活発に進んでいる領域なので、今後も動向を追いながら基盤を育てていきたいと思います。 この記事が、社内での AI 活用の中で同じような課題感を持っている方の参考になれば幸いです。 最後まで読んでいただき、ありがとうございました。 Understanding user pool JSON web tokens (JWTs) - Amazon Cognito (2026年6月11日閲覧) 。「Amazon Cognito generates two pairs of RSA cryptographic keys for each user pool. One private key signs access tokens, and the other signs ID tokens.」と記載されています。 ↩
Antigravity 2.0とは何か? Antigravity 2.0は、Googleが提供するエージェントファーストの開発プラットフォームです。Antigravity 2.0はAIエージェントの「セントラル・コマンドセンター」として機能し、アクティビティの起動、モニタリング、オーケストレーションを行う役割を担っています。
こんにちは。メルペイでソフトウェアエンジニアをしている @sapuri です。この記事は Merpay & Mercoin Tech Openness Month 2026 の 9日目の記事です。 はじめに 本記事は、2026年4月27日の Background Job Talk 〜 Temporal 活用と独自実装の舞台裏編〜 で発表した「内製ワークフローエンジンの設計とメルカリでの活用事例」を記事化したものです。 マイクロサービスアーキテクチャのような分散システムでは、複数のサービスにまたがる処理のデータ整合性をどう保つか、いわゆる分散トランザクションの扱いが大きな課題となります。 メルカリでは、この課題を Saga パターンによる結果整合性で解決するために、自社でワークフローエンジンを開発して運用しています。 このワークフローエンジンは、もともとメルコインの決済基盤における分散トランザクション管理のために開発したものです。メルペイの Payment Service で得た知見も取り入れながら設計し、現在はメルカリグループ内の複数のユースケースで利用が広がっています。 この記事では、内製に至った背景とワークフローエンジンの具体的な設計、社内での活用事例について紹介します。 分散トランザクション管理の課題と Saga パターン メルカリでは主にマイクロサービスアーキテクチャを採用しています。 そのため、お客さまがアプリで1つの操作をすると、そのリクエストは基本的に複数のサービスをまたいで処理されます。 例えば、メルカリアプリからビットコインを購入するときの決済リクエストでは、取引データの作成、メルコインの日本円残高の減算、メルペイのポイントの減算、ビットコイン残高の加算、取引データの更新といった複数の処理が関わります。 この決済リクエストは、1つのトランザクションとして扱う必要があります。つまり、一連の処理をすべて成功させるか、すべて失敗させるかのどちらかに寄せる必要があります。 しかし、各サービスがそれぞれデータベースを持っているため、単純にロールバックすることはできません。この点を考慮せずに実装すると、エラーのタイミングによってデータの不整合が発生します。 例えば、このような不整合が起こりえます。 決済が失敗したのにメルコインの日本円残高が減っている 残高は減ったがビットコイン残高が加算されない ビットコインと交換できているのに取引が完了扱いになっていない また、Two-Phase Commit のような分散トランザクションでは長期間リソースをロックするため、サービスの可用性が下がる可能性があります。 そのため、メルカリでは結果整合性のアプローチで、このような分散トランザクションを解決しています。 Saga この結果整合性を実現するためのアーキテクチャの1つとして、Saga というパターンがあります。 Saga は、トランザクションを複数の小さなトランザクションに分割して順次実行することで長時間のロックを不要にします。途中でリトライ不可能なエラーが出た場合は、成功済みの処理に対する補償トランザクションを逆順で実行します。 先ほどの暗号資産購入の例で、途中のビットコイン残高を増やす処理でリトライできないエラーが発生した場合を考えます。 この場合、この時点までに成功した処理を取り消す補償トランザクションを逆順に実行します。すでにメルコインの残高とメルペイのポイントが減らされているので、まずポイントを戻し、その次にメルコイン残高を戻し、最後に取引データを失敗として更新します。 このように実装することで、途中のどこで失敗しても結果整合性を保って処理を完了させることができます。 このあたりの話は以前の記事でも紹介しているので、興味のある方はそちらもぜひご覧ください。 メルコイン決済基盤における分散トランザクション管理 | メルカリエンジニアリング ワークフローエンジンの検討 実装の方針が決まったので、実際に Saga パターンを実装するためにワークフローエンジンの導入を検討しました。 主に検討したツールは、 GCP Workflows 、 Cadence 、 Temporal です。メルカリでは主に GCP を使ってサービスを構築しているため、まず GCP Workflows を検討しました。ただ、各処理を HTTP のエンドポイントとして実装する必要があり、ユニットテストがやりにくいという懸念がありました。また、YAML ではなく Go のコードでワークフローを記述したいという要望もありました。 Cadence と Temporal も検討しましたが、メルカリでは Cloud Spanner をメインに使っているため、Spanner に対応していなかったことから採用できませんでした。また、Temporal はシステムの規模が大きく、仕組みも比較的複雑なため、運用面にも不安がありました。 このように、既存ツールでは要件を満たせなかったため、自社でワークフローエンジンを開発することにしました。 開発時には、Cadence / Temporal のインターフェースの良さを取り入れつつ、メルペイの Payment Service ですでに実績があった「DB への実行状態の永続化 x インメモリキュー x Worker での実行管理」のアーキテクチャを再利用する方針にしました。また、Go 専用で必要な機能のみに絞ることで、数人の兼務メンテナーでも運用できる規模にしています。 ワークフローエンジンの設計 アーキテクチャ このワークフローエンジンは、アプリケーションサーバーと同じ Pod でデプロイされることを想定しています。 Go runtime で動作し、利用者は SDK として扱います。 アプリケーションは Manager というインターフェースを使ってワークフローエンジンを操作します。主に Register と Execute という2種類のインターフェースを使います。 アプリケーションはワークフローを普通の Go の関数として実装するので、その関数の内容を事前にワークフローエンジンに登録する必要があります。 manager.RegisterWorkflow() が呼び出されると、Manager は Registry というインメモリの領域に関数を格納します。 manager.Workflow().Execute() は、実際にワークフローを実行するインターフェースです。 呼び出されると、Manager は Engine Server という gRPC サーバーに対して Workflow や Activity を作成するリクエストを送ります。 Engine Server は関数名や引数、実行状態を DB に保存し、インメモリキューである Channel に WorkflowStarted イベントを publish します。 その後、Worker という goroutine が WorkflowStarted イベントを subscribe し、Registry から実行する関数を取得して、Go のリフレクションを使って実行します。 実行が完了すると Worker は Engine Server に完了を報告し、Engine Server は結果を保存して WorkflowCompleted イベントを publish します。 その後、Worker が WorkflowCompleted イベントを subscribe し、アプリケーションに関数の実行結果を返却します。 もしその結果がエラーだった場合は、後述する ErrorMarshaler というインターフェースで、その Workflow を完了させるかどうかを判定します。 ここまで説明したコンポーネントの役割を整理します。 Manager : SDK のエントリーポイント。アプリケーションは Workflow() 、 Activity() 、 RegisterWorkflows() などを呼び出します。 Engine Server : Create、Complete、List などの gRPC API を提供するサーバー。DB に Workflow や Activity の I/O と状態を保存します。 Channel : Workflow や Activity の状態遷移イベントのハブとなるインメモリキュー。 Workers : Workflow や Activity を実行する goroutine 群。Channel から状態遷移イベントを購読し、イベントの種別に応じた処理を実行します。 Registry : Register された関数をインメモリで保持します。 Recovery Worker : Engine Server に対して定期的に未完了の Workflow と Activity を List してリトライします。 コードサンプル アプリケーション側の実装イメージは次のようになります。 func (s *Service) createExchangeWorkflow(ctx context.Context, params *CreateExchangeParams) (*CreateExchangeResult, error) { saga := workflow.NewSaga(s.wm) if err := s.wm.Activity(s.authorizeBalance, params.Balance).ExecuteWait(ctx); err != nil { return nil, err } saga.AddCompensation(s.cancelBalance, params.Balance) if err := s.wm.Activity(s.authorizePoint, params.Point).ExecuteWait(ctx); err != nil { if !isCompletableError(err) { return nil, err } if cerr := saga.Execute(ctx, func(e execution.Execution) error { return e.Wait(ctx) }); cerr != nil { return nil, fmt.Errorf("failed to execute compensation activities: %w, orig_err: %v", cerr, err) } return nil, err } return &CreateExchangeResult{}, nil } まず、 createExchangeWorkflow という関数が定義されています。 この関数は、残高を確保する authorizeBalance という Activity と、ポイントを確保する authorizePoint という Activity を順に実行して結果を返す処理です。 特徴的なのは、それぞれの Activity を実行した直後に、この SDK が提供する Saga の AddCompensation インターフェースで補償トランザクションを登録している点です。 これにより、 authorizeBalance Activity が成功した後に authorizePoint Activity が失敗した場合は、 authorizeBalance を取り消す処理である cancelBalance という関数が補償トランザクションとして実行されます。 エラーハンドリング このワークフローエンジンでは、3種類のエラーを定義しています。 Completable Error : Workflow や Activity を失敗として完了させてよい、想定されたエラーです。例として、残高不足や利用制限があります。 Retryable Error: リトライ対象のエラーです。 Incompletable Error: Workflow を完了させずに停止し、Recovery Worker が後でリトライするエラーです。 Completable Error は、明示的に完了できるエラーだけを完了扱いにするための仕組みです。 クライアント側で ErrorMarshaler というインターフェースを実装したエラーとして定義される 該当しないエラーはすべて未完了として実行を停止し、Recovery Worker によってリトライされる 明示的に Completable Error を返さない限り Workflow は完了しない アプリケーションが意図していない異常な状態で Workflow が完了しない設計になる type ErrorMarshaler interface { MarshalCompletableError(error) ([]byte, error) UnmarshalCompletableError(marshaledErr []byte) error } 具体例として、ドメインのカスタムエラー型に Completable() というメソッドを定義し、それを使って ErrorMarshaler を実装します。 このようなカスタムエラー型を作っておくことで、ビジネスロジックで特定のエラーコードを含むエラーを返すと、ワークフローエンジンで完了可能なエラーとして処理されます。 type Error struct { code ErrorCode msg string } func (e *Error) Error() string { return e.msg } func (e *Error) Completable() bool { return e.code == ErrCodeCompletable } type workflowError struct { Code ErrorCode Msg string } type workflowErrorMarshaler struct{} func (workflowErrorMarshaler) MarshalCompletableError(err error) ([]byte, error) { var aerr *Error if !errors.As(err, &aerr) { return nil, err } if !aerr.Completable() { return nil, err } return json.Marshal(&workflowError{ Code: aerr.code, Msg: aerr.msg, }) } func (workflowErrorMarshaler) UnmarshalCompletableError(data []byte) error { var werr workflowError if err := json.Unmarshal(data, &werr); err != nil { return err } return &Error{ code: werr.Code, msg: werr.Msg, } } 活用事例 ここまで、独自に開発したワークフローエンジンの設計について紹介しました。 ここからは、決済以外のユースケースも含めて、社内での活用事例を3つ紹介します。 1. メルカリモバイル: 同期レスポンスと非同期処理の分離 例えば、メルカリモバイルの回線開通フローでは、Workflow と Child Workflow というサブのワークフローを定義し、同期レスポンスと非同期処理を分離しています。 具体的には、Workflow と Child Workflow で役割を分けています。Workflow は回線開通リクエストを DB に保存し、Child Workflow を fire-and-forget で起動してレスポンスを返します。 Child Workflow は、非同期で Pub/Sub イベントを発行します。失敗した場合は、Recovery Worker が Child Workflow を復旧します。 これにより、クライアントに即座にレスポンスを返しつつ、Pub/Sub 発行のような後続処理がバックグラウンドで実行されることを保証できます。 2. メルカリ グローバル EC 基盤: Saga によるチェックアウト管理 メルカリ グローバル EC 基盤では、Spanner ではなく PostgreSQL を採用しています。 ここでは、購買代行パートナーを経由して海外のお客さまが日本のメルカリの商品を購入する処理を例にします。 例えばチェックアウト確定フローでは、クーポン消費、注文作成、購買代行パートナーへの注文連携、注文確定通知送信の順に処理が進みます。 このチェックアウトの処理で、冒頭で紹介した暗号資産購入のユースケースと同様に Saga パターンを使ってトランザクションを管理しています。 途中で失敗した場合には、Saga による補償トランザクションでキャンセルします。 内製のワークフローエンジンを採用した理由には、社内にすでにある類似実装や運用基盤を活用したかったことがあります。また、メンテナーが社内にいるため直接サポートを受けられることや、必要な機能を柔軟に追加できて最適化しやすいことも大きな理由でした。 3. eKYC: Signal を使った long-running workflow eKYC によるお客さまの本人確認フローは、Workflow の中で数時間から数日の審査待ちが発生するという、いわゆる long-running workflow になっています。 これを実現するために、Temporal でも提供されている Signal という機能を実装しました。 Signal を使うことで、Workflow を中断し、外部から Workflow に情報を送って再開させるユースケースを実現できます。 長期フローを細切れのジョブとして分割せず、書類検証、審査待ち、承認または拒否という一連の流れを1本の Workflow として表現できます。 Signal を使ったアプリケーション側の実装イメージは次のようになります。 type ApprovalSignalParams struct { Approved bool RejectReason string } func (s *Service) approvalWorkflow(ctx context.Context, req *ApprovalRequest) (*ApprovalResult, error) { var result *ApprovalResult if err := s.wm.Activity(s.verifyDocument, req).ExecuteGet(ctx, &result); err != nil { return nil, err } var params ApprovalSignalParams if err := s.wm.Signal("approval").Receive(ctx, &params); err != nil { return nil, err } if !params.Approved { return nil, newCompletableError(params.RejectReason) } return result, nil } func (s *Service) handleApproval(ctx context.Context, workflowIdempotencyKey string, params ApprovalSignalParams) error { return s.wm.Signal("approval").Send(ctx, workflowIdempotencyKey, params) } approvalWorkflow という関数は、 verifyDocument という Activity を実行した後に、 approval という signal を待機します。 審査が終わり、外部から approval signal をこの Workflow に送信すると、 approvalWorkflow が途中から再開します。 まとめ 分散システムにおけるデータ整合性の課題に対して、メルカリでは Saga パターンによる結果整合性を採用し、それを支える仕組みとしてワークフローエンジンを内製しています。 この記事では、その背景と設計、社内での活用事例について紹介しました。 なお、この SDK を使った開発を支援するために、専用の静的解析ツールも開発して運用しています。このあたりの運用面についても発表で触れているので、興味のある方は Speaker Deck のスライドもぜひご覧ください。 次の記事は kubomiさんの「Build First, Discuss Later|初回ミーティングに動くプロトタイプを持ち込んだら、意思決定が爆速になった」です。引き続きお楽しみください。

動画

該当するコンテンツが見つかりませんでした

書籍