TECH PLAY

OSS

イベント

マガジン

技術ブログ

はじめに 弊社では LLM を活用した機能開発の観測基盤として Langfuse をセルフホストで運用しています。Langfuse は LLM アプリケーションのトレーシングやプロンプト管理等に活用できるオープンソースの LLM エンジニアリングプラットフォームです。 さてこの種のサービスには認証の課題がつきまといます。Langfuse は標準でメールアドレス / パスワードによるログインが可能ですが、弊社の方針として開発組織内で利用、管理しているアプリケーションやサービス群には IAM Identity Center を IdP とする SSO を採用しています。 Redash や SendGrid といったサービスで既にこの方式を採用しており、Langfuse でも同様に IAM Identity Center 経由の SSO ログインを実現したいと考えました。 ところが Langfuse は IAM Identity Center を IdP として直接サポートしていません。Langfuse の認証は NextAuth.js ベースであり、対応する認証方式は OAuth / OIDC が中心です *1 。では IAM Identity Center の OIDC 機能をそのまま使えるかというと、そう単純ではありません。IAM Identity Center が OIDC を提供するにはアプリケーション側が trusted token issuer に対応している必要があり、Langfuse はこれに該当しないため直接利用できません。 Cognito と Langfuse の SSO 連携については 既に優れた先例 がありますが、ここに IAM Identity Center を加えた構成についての情報はあまり無いようです。ニッチな話題かもしれませんが、同様の課題を抱えている方もいるのではと考え、本稿にて事例を紹介できればと思います。 構成 前述の事情を踏まえ、間に Amazon Cognito を挟む構成としました。Cognito は SAML によって IdP からの認証情報を受け取り、これを OIDC にてアプリケーション(今回の場合は Langfuse)に提供する役目を担います。IAM Identity Center は SAML による外部アプリケーション(今回の場合は Cognito)のフェデレーションをサポートしているので、これを組み合わせると以下のような構成が実現できます。 IAM Identity Center --- (SAML) ---> Cognito --- (OIDC) ---> Langfuse 認証フローを時系列で描くと以下のようになります。 sequenceDiagram autonumber participant User as ユーザー (Browser) participant LF as Langfuse participant Cog as Cognito participant IDC as IAM Identity Center User->>LF: SSO ログインボタンをクリック LF->>User: Cognito のログイン URL へリダイレクト User->>Cog: 認証リクエスト (OIDC) Cog->>User: IAM Identity Center へリダイレクト User->>IDC: SAML 認証リクエスト IDC->>User: ログイン画面の表示 / 認証の実施 IDC->>User: SAML Assertion を発行 User->>Cog: SAML Assertion をポスト (ACS URL) Cog->>Cog: SAML Assertion の検証 / ユーザー情報の処理 Cog->>User: Langfuse へリダイレクト (認可コード) User->>LF: 認可コードを送信 LF->>Cog: トークン交換リクエスト Cog->>LF: ID トークン / アクセストークン (OIDC) LF->>User: ログイン完了 要するに Cognito が SAML -- OIDC 間の橋渡し役を担う格好です。ユーザー視点では Langfuse のログイン画面で SSO ボタンを押すと IAM Identity Center のログイン画面に遷移し、認証後 Langfuse に戻ってきます。 これを実現する構成が以下のようになります。本稿の趣旨は Langfuse 利用にかかる SSO ログイン化が主軸につき、Langfuse 本体の構成はかなり端折った図である点ご容赦ください *2 構成図。admin / langfuse / logging / security は AWS アカウント名の意 なお上図における logging / security 各アカウントは以下拙稿における member / security に対応するものです。主に SSO ログイン時の監査に使われる周辺要素であり、本稿では詳細を割愛します。 設定の手順 設定は大きく4つのステップに分かれます。 IAM Identity Center に SAML アプリケーションを登録する(admin アカウント) Cognito で SAML IdP と OIDC クライアントを構成する(langfuse アカウント) Langfuse に OIDC 関連の環境変数を設定する(langfuse アカウント) SSO ログインへの一本化(langfuse アカウント) 以下、各ステップの内容を解説します。 Step 1: IAM Identity Center への SAML アプリケーション登録 admin アカウントの IAM Identity Center に、Langfuse 用のカスタム SAML アプリケーション( customer-managed application )を Terraform で登録します *3 。 aws_ssoadmin_application リソースで application_provider_arn に custom-saml を指定し、IAM Identity Center のポータル上での可視性を有効にします。 あわせて aws_ssoadmin_application_assignment で SSO ログインを許可する対象を紐付けます。グループ単位での割り当ても可能ですが、弊社では職責や担当職務に応じて柔軟にアクセス範囲を制御したかったため、ユーザー単位での割り当てとしました *4 。 続いてマネジメントコンソールから Application metadata を設定します。これは Cognito が SAML Assertion を正しく受け取るために必要な設定です。 Application ACS URL : https://<Cognito ドメイン>.auth.<リージョン>.amazoncognito.com/saml2/idpresponse Application SAML audience : urn:amazon:cognito:sp:<Cognito User Pool ID> ACS URL は Cognito User Pool のドメイン名から、SAML audience は Cognito User Pool の ID からそれぞれ導出されます。Step 2 で Cognito User Pool を作成した後に設定する、という順序になります *5 。 同じくマネジメントコンソールから Attribute mappings を設定します。SAML Assertion に含めるユーザー属性と Cognito 側の属性との対応を定義するものです。 User attribute in the application Maps to this string value or user attribute in IAM Identity Center Format Subject(変更不可) ${user:email} persistent email ${user:email} basic email_verified true unspecified name ${user.name} unspecified email_verified を固定値 true としているのは弊社の運用上 IAM Identity Center で管理されているユーザーのメールアドレスは検証済みであると見なして差し支えないためです。これは以下の拙稿に詳細を譲りますが、弊社の IAM Identity Center は IdP として Entra ID を参照しており、Entra ID 上のユーザ情報を信頼できるケースに該当する為です。 最後に、マネジメントコンソールから SAML メタデータファイルをダウンロードします。 IAM Identity Center コンソールへ移動 Application assignments → Applications を選択 Customer managed タブへ移動 対象の SAML アプリケーションを選択 IAM Identity Center metadata 内の IAM Identity Center SAML metadata file からダウンロード このメタデータ XML ファイルは次のステップで Cognito に渡す必要があります。 Step 2: Cognito の構成 langfuse アカウント側では Cognito User Pool を作成し、IAM Identity Center を SAML IdP として登録した上で、Langfuse 向けの OIDC クライアントを構成します。 まず Step 1 でダウンロードした SAML メタデータ XML を S3 バケットに手動でアップロードし、Terraform からは data "aws_s3_object" で参照します *6 。S3 バケット自体も Terraform で作成し、サーバーサイド暗号化とパブリックアクセスブロックを設定しています。 続いて、この SAML メタデータを使って aws_iam_saml_provider と aws_cognito_identity_provider の2つを設定します。前者は IAM レベルでの SAML プロバイダ登録、後者は Cognito User Pool に対する SAML IdP の登録です。 aws_cognito_identity_provider では attribute_mapping で Step 1 の Attribute mappings との対応を指定します。 なお provider_details には ignore_changes を設定しています。Cognito は MetadataFile を解釈して内部的にいくつかの属性を展開するため、内容に変更がなくても terraform plan で差分が出続けます。これを抑制するための措置です。 次に aws_cognito_user_pool_client を設定します。これは Langfuse から見た OIDC クライアントに相当するリソースです。OAuth フローには Authorization Code Grant( code )を、スコープには openid / email / profile を指定します。 callback_urls には Langfuse の OIDC コールバック URL( https://<Langfuse のドメイン>/api/auth/callback/custom )を設定します。トークンの有効期間やクライアントシークレットの生成など、必要な設定を適宜入れます。 最後に、Cognito が払い出した Client ID / Client Secret / Issuer URL を aws_ssm_parameter (SecureString)で SSM パラメータストアに格納し、Langfuse の ECS タスク定義から参照できるようにします。 Step 3: Langfuse への環境変数設定 Langfuse は Custom OAuth Provider としての設定を環境変数で受け取ります。ECS タスク定義に以下の環境変数を追加しました。 平文で渡すものとして AUTH_CUSTOM_NAME 、 AUTH_CUSTOM_CHECKS 、 AUTH_CUSTOM_ALLOW_ACCOUNT_LINKING の3つがあります。SSM パラメータストアから取得する秘密情報として AUTH_CUSTOM_CLIENT_ID 、 AUTH_CUSTOM_CLIENT_SECRET 、 AUTH_CUSTOM_ISSUER の3つがあり、これらは Step 2 で格納した値を参照します。 AUTH_CUSTOM_NAME は Langfuse のログイン画面に表示される SSO ボタンのラベルです。わかりやすい名称を選ぶと良いでしょう AUTH_CUSTOM_CHECKS には pkce を指定し PKCE (Proof Key for Code Exchange) を有効にしています AUTH_CUSTOM_ALLOW_ACCOUNT_LINKING を true にすると、同一メールアドレスの既存アカウントと SSO アカウントが紐付けられます SSO 導入前のアカウントとの互換性のために有効にしています これが無いと後述のトラブルシュート時に Cognito 側で対応する手法が取れず Langfuse 側 DB を直接触ってアカウント管理をする手間が発生します Step 4: SSO ログインへの一本化 SSO ログインが安定稼動していることを確認した後、環境変数 AUTH_DISABLE_USERNAME_PASSWORD を true に設定してメールアドレス / パスワードによるログインを無効化しました。これにより Langfuse のログイン画面は SSO ボタンのみが表示される状態になります。アカウント管理を IAM Identity Center に一元化でき、Langfuse 側での個別のアカウント発行 / 削除が不要となります。 サンプルコード Step 1〜2 で使用した Terraform コードの抜粋を以下に示します。それぞれ collapsed になっていますので、クリックして中身を参照ください。 admin アカウント: IAM Identity Center への SAML アプリケーション登録 # Identity Store からユーザー情報を参照 data "aws_identitystore_user" "main" { for_each = toset (local.langfuse_users) identity_store_id = tolist (data.aws_ssoadmin_instances.main.identity_store_ids) [ 0 ] alternate_identifier { unique_attribute { attribute_path = "UserName" attribute_value = each.key } } } # カスタム SAML アプリケーションの登録 resource "aws_ssoadmin_application" "langfuse" { name = "Langfuse" application_provider_arn = "arn:aws:sso::aws:applicationProvider/custom-saml" instance_arn = tolist (data.aws_ssoadmin_instances.main.arns) [ 0 ] portal_options { visibility = "ENABLED" sign_in_options { origin = "IDENTITY_CENTER" } } } # SSO ログインを許可するユーザーの割り当て resource "aws_ssoadmin_application_assignment" "langfuse" { for_each = toset (local.langfuse_users) # 職責に応じてフィルタしたユーザーリスト application_arn = aws_ssoadmin_application.langfuse.arn principal_id = data.aws_identitystore_user.main [ each.key ] .id principal_type = "USER" } langfuse アカウント: Cognito の構成 # ----- SAML メタデータの管理 ----- # メタデータ XML を格納する S3 バケット(暗号化・パブリックアクセスブロックは別途設定) resource "aws_s3_bucket" "saml_metadata" { bucket = "mntsq-$ { var.env } -saml-metadata" } # IAM Identity Center からダウンロードしたメタデータ XML を参照 data "aws_s3_object" "saml_metadata" { bucket = aws_s3_bucket.saml_metadata.id key = "langfuse.xml" } # ----- SAML プロバイダ・Cognito User Pool ----- resource "aws_iam_saml_provider" "langfuse" { name = "langfuse" saml_metadata_document = data.aws_s3_object.saml_metadata.body } resource "aws_cognito_user_pool" "langfuse" { name = "langfuse" auto_verified_attributes = [ "email" ] } resource "aws_cognito_user_pool_domain" "langfuse" { user_pool_id = aws_cognito_user_pool.langfuse.id domain = "mntsq-$ { var.env } -langfuse" } # IAM Identity Center を SAML IdP として登録 resource "aws_cognito_identity_provider" "langfuse" { user_pool_id = aws_cognito_user_pool.langfuse.id provider_name = "langfuse" provider_type = "SAML" provider_details = { "MetadataFile" = data.aws_s3_object.saml_metadata.body } attribute_mapping = { email = "email" email_verified = "email_verified" name = "name" } lifecycle { # Cognito が MetadataFile を解釈して provider_details を展開するため、 # 毎回差分が出るのを抑制する ignore_changes = [ provider_details ] } } # ----- OIDC クライアント (User Pool Client) ----- resource "aws_cognito_user_pool_client" "langfuse" { name = "langfuse" user_pool_id = aws_cognito_user_pool.langfuse.id allowed_oauth_flows_user_pool_client = true allowed_oauth_scopes = [ "openid" , "email" , "profile" ] allowed_oauth_flows = [ "code" ] supported_identity_providers = [ aws_cognito_identity_provider.langfuse.provider_name ] access_token_validity = 8 id_token_validity = 8 refresh_token_validity = 1 token_validity_units { access_token = "hours" id_token = "hours" refresh_token = "days" } callback_urls = [ "https://<Langfuse のドメイン>/api/auth/callback/custom" ] default_redirect_uri = "https://<Langfuse のドメイン>/api/auth/callback/custom" generate_secret = true } # ----- 認証情報の SSM パラメータストア格納 ----- resource "aws_ssm_parameter" "auth_id" { name = "/$ { var.env } /langfuse/AUTH_CUSTOM_CLIENT_ID" type = "SecureString" value = aws_cognito_user_pool_client.langfuse.id } resource "aws_ssm_parameter" "auth_secret" { name = "/$ { var.env } /langfuse/AUTH_CUSTOM_CLIENT_SECRET" type = "SecureString" value = aws_cognito_user_pool_client.langfuse.client_secret } resource "aws_ssm_parameter" "auth_issuer" { name = "/$ { var.env } /langfuse/AUTH_CUSTOM_ISSUER" type = "SecureString" value = "https://cognito-idp.$ { data.aws_region.current.name } .amazonaws.com/$ { aws_cognito_user_pool.langfuse.id } " } トラブルシューティング 運用を開始して以降、SSO ログインが失敗するケースに遭遇しました。とくに初回ログイン時に顕著で、Cognito と Langfuse との間でユーザー情報のフェデレーションがうまくいかない場合に発生します。 リトライで解消するケースもありますが、それでも解決しない場合は Cognito 側のユーザーを一旦削除し、再度 Langfuse へ SSO ログインしてもらう ことで通るようになります。Cognito User Pool 内のユーザーは IAM Identity Center から federate されたものなので、削除しても次回の SSO ログイン時に再作成されます。 手順としては以下の通りです。 Cognito User Pool のマネジメントコンソールへ移動 User management → Users でログインに失敗しているユーザーを特定 対象ユーザーを選択し、まず Disable user access を実施 その後 Delete user を実行 ユーザー削除後に改めて SSO ログインを試みてもらえば、Cognito 側にユーザーが再作成されてログインが成功するはずです。 おわりに IAM Identity Center を IdP として Langfuse に SSO ログインを実現する構成について解説しました。ポイントは Cognito を SAML と OIDC との翻訳役として活用する 点です。 設定にあたっては Terraform とマネジメントコンソールの手作業が混在する点がやや煩雑です。とくに IAM Identity Center 側の Application metadata や Attribute mappings、SAML メタデータの取得はコンソール操作が必要であり、手順として残しておかないと再現が難しくなります。 Cognito + Langfuse の SSO 構成 については先例がありますが、IAM Identity Center を加えた構成についてはあまり情報が見当たりませんでした。このパターンは Langfuse に限らず、OIDC のみをサポートするアプリケーションに IAM Identity Center 経由の SSO を提供したい場合に広く応用できるものと考えています。同様の課題を抱えている方の参考になれば幸いです。 文責:MNTSQ 株式会社 SRE 秋本 注記:この記事は文責者の過去記事と弊社内のドキュメントをもとに Claude Opus 4.6 が作成した内容を9割程度そのまま使用しています *1 : https://langfuse.com/self-hosting/security/authentication-and-sso を参照。Langfuse は Google / GitHub / Azure AD (Entra ID) / Okta / Auth0 / Custom OAuth Provider 等をサポートしています *2 : 興味のある方向け:ECS で ClickHouse 含めて Langfuse が必要とする諸要素を動作させ、DB は Aurora PostgreSQL、キャッシュ関連は ElastiCache にて Valkey クラスタを組んで動作させています *3 : 実際には全ての設定を Terraform でカバーできるわけではないので、適宜 AWS マネジメントコンソールからの設定作業も必要になります *4 : この種の柔軟さをグループ単位で表現できなかったという内部事情もあります *5 : 厳密には Cognito User Pool の作成 → IAM Identity Center の Application metadata 設定 → Cognito 側の残りの設定、という順序を踏む必要があります。Terraform で一括管理する場合は初回適用時にこの依存関係を意識する必要があります。要するに IAM Identity Center と Cognito とを行ったり来たりする必要が2026年2月時点で有ります *6 : SAML メタデータはコード管理の対象としていません。IAM Identity Center 側で生成されるものであり、かつ XML の内容が大きいため S3 経由での参照としました。メタデータの取得手順は Step 1 に記載の通りです
本記事は 2025 年 5 月 19 日に公開された How Amazon maintains accurate totals at scale with Amazon DynamoDB を翻訳したものです。翻訳は Solutions Architect の嶋田 朱里が担当しました。 Amazon の Finance Technologies Tax チーム (FinTech Tax) は、世界中の法域で税額計算、税額控除、納付、報告といった重要なサービスを管理しています。このアプリケーションは、複数の国際マーケットプレイスで年間数十億件の取引を処理しています。 この投稿では FinTech Tax チームが Amazon DynamoDB のトランザクションと条件付き書き込みを使用して、段階的な源泉徴収を実装した方法を紹介します。 これらの DynamoDB の機能を使用することで、拡張性と回復力があるイベント駆動の税額計算サービスを構築し、大規模でもミリ秒レベルのレイテンシーを実現しました。 また、一貫したパフォーマンスを実現しながら、データの正確性を厳密に維持するための設計上の決定と実装の詳細についても探ります。 要件 Amazon は複数の法域にまたがる複雑なフィンテック (金融技術) 分野の税制環境で事業を行っており、さまざまな源泉徴収税の要件を管理する必要があります。同社には、膨大な取引量を処理できる堅牢な税処理ソリューションが必要です。このシステムは、毎日数百万件の取引をリアルタイムで処理し、個人ごとの累積取引額の正確な記録を源泉徴収税計算のために維持する必要があります。主な要件には、段階的な源泉徴収税率を正確に適用すること、および Amazon の既存システムとのシームレスな統合が含まれます。このソリューションはデータの整合性と高可用性を維持し、さまざまな源泉徴収税制度に対する規制遵守をサポートする必要があります。 課題 主な課題は世界中の複雑に絡み合った税法に厳密に準拠することにあります。特に、段階的課税モデルでは、個人の総取引額が財務年度内の特定の閾値を超えるかどうかに基づいて、異なる源泉徴収税率が適用されます。個人の累積取引額が増加し、あらかじめ定義された閾値を超えると、その取引に適用される源泉徴収税率が変更されます。例えば、総額が 100,000 インドルピー (INR) に達するまでは低い税率が適用され、その閾値を超えると、より高い税率が適用されます。 次の図は、累積取引金額の閾値に基づいて税率が段階的に変化する様子を示した、3 段階の税率モデルを示しています。 段階的課税モデルの課題は、源泉徴収についてリアルタイムの計算を行いながら、各個人の累計取引額を正確に追跡・記録管理することにあります。 Amazon は 1 日に数百万件のトランザクションを処理しなければなりません。 さらに、正の取引・負の取引(例:プラスまたはマイナスの会計調整)に関わらず、正しい源泉徴収税率をリアルタイムで適用することが求められます。 これには高い取引量 (個人あたり約 150 トランザクション/秒) を処理しながら、正確な記録を維持できるシステムが必要です。 ソリューションの概要 次の図は Amazon の源泉徴収税計算サービスの全体アーキテクチャです。 ワークフローは以下のステップで構成されています: クライアントが Amazon API Gateway に源泉徴収税計算リクエストを送信します。 API Gateway が税額計算 (Tax Computation) AWS Lambda を呼び出します。 税額計算 Lambda 関数が、DynamoDB の個人の累積トランザクションストア(Cumulative Transaction Store)テーブルを取得します。累積トランザクションストアテーブルは過去の累計値をもとに、ユーザーごとの累積取引金額をリアルタイムで管理します。これにより、段階的な税率を適用するための個人の累積取引金額の合計を正確に追跡できます。 Lambda 関数は取引の詳細と個人の累計金額に基づいて、ルールエンジンライブラリから適用される税率を取得します。取得した税率と取引データをもとに、税額が計算されます。 計算結果は取引データの監査と履歴管理のために DynamoDB の取引監査ストア (Transaction Audit Store) に格納されます。 現在の取引金額をもとに、個人の累積取引金額が累積トランザクションストアに更新されます。 DynamoDB 操作中に発生する一時的なエラー (例: ConditionalCheckFailed 、 TransactionConflict ) は、 Amazon Simple Queue Service (Amazon SQS) キューに送られ、再試行されます。 クライアントエラー (400 Validation Exception、401 Unauthorized、403 Forbidden など) や永続的なサーバー障害によるエラーは、SQS DLQ で処理されます。 実装上の考慮事項 トランザクションを受信すると、システムはルールエンジンから導出された閾値に対して個人の累積取引額を評価して、適用される税率を判断します。その後、累積取引金額は累積トランザクションストアに更新され、監査証跡も記録されます。 複数のスレッドが同一個人のデータベースを同時に更新しようとすると、競合が発生します。一般的な 楽観的排他制御 (OCC) の手法は、累積値を読み取り、指定範囲の値に対する税率を計算し、累積値が読み取り以降変更されていないという条件付きでトランザクションを書き込みます。もし値が変更されていた場合はループの最初から処理をやり直します。 トラフィックが多い場合、この再実行が頻繁に発生する可能性があります。 私たちのアプローチは、一般的な OCC パターンを改良したものです。条件の判定を「累積値が最初に読み取った時点の範囲内に留まっているか」のみに絞っています。 累積値が変化しても、その値が閾値を超えない限り、ループを再実行する必要はありません。 この方法により、条件の不一致が少なくなるため、スループットが上がります。個人の累積値がより高い範囲に移行した場合は、書き込み操作が失敗します。 その場合は、更新された値をもとに、読み取りと書き込みを再試行する必要があります。 OCC 戦略とは異なり、このアプローチでは最後の読み取り以降に値が変化していても処理が成功します。これにより、競合を最小限に抑え、スループットを向上させることができます。同時更新(累積合計が閾値を超えるケース)によって条件付き書き込みが失敗し、 ConditionalCheckFailedException が発生することがありますが、これは想定された動作であり、データの不整合を示すものではありません。 一時的なエラーを処理し、同じトランザクションの重複処理を防ぐために、クライアント要求トークン (Client Request Token, CRT) を含んだ TransactWriteItems 操作を実行することで、インクリメント操作を冪等性のある状態で行えます。 TransactionCanceledException は、 エクスポネンシャルバックオフ などのエラー処理メカニズムで処理されます。 この戦略の組み合わせにより、システムはデータの整合性を維持しながら、高いスループットとスケーラビリティを実現できます。 複雑なロック機構が不要になり、従来のOCCソリューションと比べて効率性が向上します。また、大規模な設定やチューニングを必要とせず、さまざまなトランザクション量や同時実行レベルに柔軟に対応できる、高性能なソリューションを提供します。 累積トランザクションストア 累積トランザクションストアテーブルは、特定の個人の取引金額の累積和を維持するために使用されます。以下のデータモデルを使用します: { "indvidual_id": { "S": "TIN1" // 累積合計を管理する単位となる一意識別子 }, "cumulative_amount_consumed": { // 使用された金額の累積合計を表す "N": "0" } } 税控除対象品目の在庫管理 税額控除監査ストア(Tax Deduction Audit Store)テーブルは、各取引の税控除率の監査記録を保存するために使用されます。以下のデータモデルを使用します: { "transaction_primary_key": { "S": "XXX111#2024-01-01T13:05:28" // トランザクションの一意識別子(PartitionKey#SortKey) }, "transaction_amount": { "S": "1000". //トランザクション全体の金額 }, "transaction_tax_amount": { "S": "100". //控除される税額 }, "transaction_tax_rate":{ "S":"10". //このトランザクションに適用される税率(パーセント表記) } ... } 条件付き書き込みのコード 次のコードは dynamodb.transact_write_items() を使用して、累積トランザクションストアと取引監査ストアの 2 つの DynamoDB テーブルにまたがるアトミックな条件付き書き込み操作を示しています。累積トランザクションストアから既存のレコードを取得し、現在の取引金額と既存データに基づいて cumulative_amt_consumed  (累積消費金額)の更新値を計算します。同時に、取引監査ストアに新しいレコードを記録し、ID、値、税額、税率などのトランザクション詳細を記録します。 transact_write_items() メソッドは、取引トランザクションストアテーブルへの更新操作と取引監査ストアテーブルへの put 操作を 1 つのトランザクションとして実行します。 2 つの操作がともに成功すれば、両方のテーブルに変更がコミットされます。そうでない場合は、トランザクション全体がロールバックされ、データの整合性が保たれます。 SAMPLE_TIN = 'TIN1' # 累積トランザクションストアにおける一意の識別子を表す SAMPLE_AMOUNT = 5000 # transact_write_items で処理される売上値を表す SAMPLE_TRANSACTION_ID = 'XXX111' DEFAULT_TAX_RATE = 10 # 既定の税率(パーセンテージ値) LOWER_TAX_RATE = 5 # 低い方の税率(パーセンテージ値) RETRYABLE_ERRORS = ( 'TransactionConflictException', 'ConditionalCheckFailedException', 'ProvisionedThroughputExceededException', 'ThrottlingException', 'ServiceUnavailableException', 'InternalServerErrorException' ) MAX_RETRIES = 3 RETRY_DELAY = 0.1 # 秒 def send_to_error_queue(error_message, is_retryable, transaction_id): queue_url = 'TransientErrorQueue' if is_retryable else 'NonTransientErrorQueue' message_body = { 'error_message': error_message, 'transaction_id': transaction_id } try: sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message_body) ) except Exception as e: print(f"Failed to send message to {queue_url}: {str(e)}") def process_transaction(tin, amount, transaction_id): for attempt in range(MAX_RETRIES + 1): try: response = dynamodb.get_item(TableName='CumulativeTransactionStore', Key={'cumulativeStore_primary_key': {'S': tin}}) item = response.get('Item') if not item: print("Record not found.") return cumulative_amount_consumed = int(item.get('cumulative_amount_consumed', {}).get('N', '0')) threshold_value = int(item.get('threshold_value', {}).get('N', '0')) current_amount = amount if (cumulative_amount_consumed + current_amount < threshold_value): update_expression = 'SET cumulative_amount_consumed = cumulative_amount_consumed + :val, tax_rate = :tax_rate' tax_rate = DEFAULT_TAX_RATE max_value = threshold_value min_value = 0 else: update_expression = 'SET cumulative_amount_consumed = cumulative_amount_consumed + :val, tax_rate = :tax_rate' tax_rate = LOWER_TAX_RATE max_value = sys.maxsize min_value = threshold_value expression_attribute_values = { ':val': {'N': str(current_amount)}, ':tax_rate': {'N': str(tax_rate)}, ':lo': {'N': str(min_value)}, ':hi': {'N': str(max_value)} } dynamodb.transact_write_items( TransactItems=[ { 'Update': { 'TableName': 'CumulativeTransactionStore', 'Key': {'cumulativeStore_primary_key': {'S': tin}}, 'UpdateExpression': update_expression, 'ConditionExpression': 'cumulative_amount_consumed < :hi AND cumulative_amount_consumed >= :lo', 'ExpressionAttributeValues': expression_attribute_values, } }, { 'Put': { 'TableName': 'TaxDeductionAuditStore', 'Item': { 'transactionID': {'S': transaction_id}, 'transaction_amount': {'N': str(amount)}, 'transaction_tax_amount': {'N': str(amount * tax_rate / 100)} } } } ], ClientRequestToken=transaction_id ) print(f"Transaction processed successfully on attempt {attempt + 1}") return # Success, exit the function except Exception as e: error_code = e.response['Error']['Code'] error_message = f"Error accessing DynamoDB: {error_code} - {e.response['Error']['Message']}" is_retryable = error_code in RETRYABLE_ERRORS if is_retryable and attempt < MAX_RETRIES: print(f"Retryable error occurred on attempt {attempt + 1}. Retrying...") time.sleep(RETRY_DELAY * (2 ** attempt)) # Exponential backoff else: send_to_error_queue(error_message, is_retryable, transaction_id) # If we've exhausted all retries error_message = f"Max retries ({MAX_RETRIES}) exceeded. Last error: {error_message}" send_to_error_queue(error_message, True, transaction_id) # Main execution try: process_transaction(SAMPLE_TIN, SAMPLE_AMOUNT, SAMPLE_TRANSACTION_ID) except Exception as e: print(f"Transaction processing failed: {str(e)}") 結果 システムのパフォーマンス評価では、実行時間を 30 秒に固定し、スレッド数を変えながら一連のテストを実施しました。 各実行後に累積トランザクションストアをゼロにリセットすることで、さまざまな負荷条件下でのシステムの動作を包括的に分析しました。 1 スレッドから 130 スレッドにスケールアップするにつれて、処理されたトランザクション数が一貫して増加したことから、システムが大規模な並列処理の場面においても高い並行性を効果的に処理できることが示されました。 しかし、この処理能力の向上には一時的な競合の増加が伴いました。これは、大規模な並列処理の場面において、パフォーマンスと競合管理のトレードオフを浮き彫りにしています。 一時的なアクセスの競合は、複数のトランザクションが同時に同じアイテムを更新しようとしたときに発生し、一部のトランザクションがキャンセルされることになります。このデータが示すのは、スレッド数を増やしても競合管理のオーバーヘッドが増大するため、スループットが大幅には向上しなくなるということです。 次のグラフはスレッド数とトランザクションメトリクスの相関関係を示しています。 これにより、スループットと競合率が同時実行スレッドの増加に伴ってどのように変化するかがわかります。 結論 この投稿では、Amazon Fintech チームが DynamoDB の強力な条件付き書き込み機能を使用することで、段階的税率アプリケーション向けのシンプルかつ高いスケーラビリティを持つソリューションを実装した方法を紹介しました。 この手法を採用し、まれに発生する ConditionalCheckFailedException を先に見越して処理することで、大量の同時トランザクションが発生するシナリオにおいても、高いスループットとスケーラビリティを実現しながら、データの一貫性を維持することができます。 この手法は、同時リクエスト数が増加するにつれボトルネックになりがちな楽観的ロックの必要性をスマートに排除しています。代わりに、Amazon Fintech システムは DynamoDB の組み込みの同時アクセス制御メカニズムを活用し、高負荷状況でも一貫したデータと効率的な更新を可能にしています。 拡張性のあるトランザクション処理システムを独自に実装するには、DynamoDB の 条件付き更新 機能を確認してください。詳しいガイダンスが必要な場合は、DynamoDB の ドキュメント を参照するか、AWS サポートにお問い合わせください。 著者について Jason Hunter はカリフォルニア在住の Amazon DynamoDB 専任のプリンシパルソリューションアーキテクトです。2003 年から NoSQL データベースに携わっています。Java、オープンソース、XML への貢献で知られています。 DynamoDB の投稿 や Jason Hunter が書いた他の投稿は、 AWS Database Blog で見つけることができます。 Balajikumar Gopalakrishnan は、Amazon Finance Technology の Principal Engineer です。 2013 年から Amazon に在籍し、Amazon の顧客の生活に直接影響を与える技術を通じて、実世界の課題を解決してきました。 仕事以外では、ハイキング、絵画、家族と過ごすことを楽しんでいます。また、映画好きでもあります。 Jay Joshi は、Amazon Finance Technology のソフトウェア開発エンジニアです。 2020 年から Amazon に在籍し、主に世界各地の法域での税額計算とレポーティングのためのプラットフォームの構築に従事しています。 仕事以外では、家族や友人と過ごしたり、新しい料理の行き先を探索したり、バドミントンをするのが好きです。 Arjun Choudhary は 2019 年からAmazon の Finance Technology 部門でソフトウェア開発エンジニアとして働いています。主な業務は、グローバルな法人税の源泉徴収プラットフォームの開発です。仕事以外では、小説を読んだり、クリケットやバレーボールをしたりして楽しんでいます。
はじめに 近年、ソフトウェア開発において「SBOM (Software Bill of Materials)」というワードを聞くことが増えてきました。 経済産業省が公開するソフトウェア管理に向けたSBOM (Software Bill of Materials) の導入に関する手引 ver 2.0において、「ソフトウェアコンポーネントやそれらの依存関係の情報も含めた機械処理可能な一覧リスト」として説明があるように、SBOMはソフトウェア部品表として広く認識されています。 SBOMに関する記事はこちらもおすすめ SBOMは単なる概念や考え方にとどまらず、実際に運用・共有することを前

動画

書籍