
GitHub
イベント
マガジン
技術ブログ
― マルチモーダル embedding の可能性と限界 ― サイオステクノロジー株式会社 Saman Elasticsearch のベクトル検索といえば、これまではテキストや画像が中心でした。 しかし最近は、テキスト・画像・動画・音声を同じ埋め込み空間で扱える「マルチモーダル embedding」が現実的な選択肢になってきています。 本記事は、Elastic Inference Service (以下 EIS) で利用できる .jina-embeddings-v5-omni-small を使い、音声ファイルを Elasticsearch に保存して kNN 検索でどこまで使えるかを検証した PoC のレポートです。 結論を先に書くと、次のとおりです。 音声を embedding 化して Elasticsearch に保存し、kNN で検索する 基本パイプラインは問題なく動いた 音声 → 音声(audio-to-audio) の類似検索は期待どおりに機能した 一方で テキスト → 音声(text-to-audio) の意味検索は、今回の条件では十分な精度が出なかった 技術ブログとして、うまくいった部分だけでなく「なぜうまくいかなかったか」も合わせて共有します。同じような検証を計画している方の参考になれば幸いです。 目次 マルチモーダル embedding と Jina v5 Omni embedding とは Jina Embeddings v5 Omni なぜ音声を検索可能にしたいか システム構成 インデックスの mapping 検証データ 音声を embedding 化して保存する Step 1: 音声を Base64 に変換する Step 2: EIS に送って embedding を生成してもらう Step 3: 返ってきた embedding を受け取る Step 4: メタデータと一緒に Elasticsearch に保存する 検証1: audio-to-audio 検索 結果 検証2: text-to-audio 検索 結果 なぜ text-to-audio は弱かったのか 1. 検証データが「音響的に似すぎている」 2. テキストと音声の意味空間が完全には揃っていない 3. 音声が短く、意味的な信号が少ない まとめ 改善方針: transcript と組み合わせた Hybrid Retrieval まとめ 参考資料 マルチモーダル embedding と Jina v5 Omni embedding とは embedding とは、データを数値ベクトルに変換したものです。たとえば「ログインできない」と「アカウントに入れない」は意味が近いので、embedding にすると近い位置のベクトルになります。 ログインできない → [0.12, -0.03, 0.45, …] アカウントに入れない → [0.11, -0.04, 0.46, …] ← 近い 今日の天気は晴れです → [0.92, 0.31, -0.20, …] ← 遠い これまでは、テキストはテキスト用モデル、画像は画像用モデルと、モダリティごとに別の embedding 空間を使うのが一般的でした。 マルチモーダル embedding は、テキストも画像も音声も「同じ空間」に埋め込みます。同じ意味を持つテキストと音声が、空間上で近い位置に置かれることが理想です。 Jina Embeddings v5 Omni 今回使ったのは Jina AI のマルチモーダル embedding モデルです。Elastic Search Labs でも、テキスト・画像・動画・音声を 1 つの Elasticsearch インデックスに保存して横断的に検索できるモデルとして紹介されています。 EIS では preconfigured な inference endpoint として以下が利用できます。 .jina-embeddings-v5-omni-small(出力次元: 1024) 利用可能なモデルは、Dev Tools で GET _inference を実行することで確認できます。 ここで重要なのは、今回使用するモデルは ChatGPT のような 回答を生成する LLM ではなく、ベクトル変換のための embedding モデル だという点です。ここを混同すると後段の評価がブレるので、最初に押さえておきます。 なぜ音声を検索可能にしたいか ビジネス的な動機は明確です。音声データの「中で何が話されているか」で検索したいというニーズは、現場にたくさんあります。 コールセンター音声から「ログインできない」と話している通話を見つけたい 問い合わせ音声を内容で分類して FAQ を改善したい 障害発生時に「画面が固まる」「決済できない」といった声が急増していないか確認したい 顧客との会話音声から、契約・解約に関する文脈を後から探したい 社内セミナーや会議音声から、必要な情報を探したい 医療の現場で、音声カルテから何かを検索したい これらは現在、文字起こし(transcript)してから text search で実現するのが普通です。マルチモーダル embedding が一定の精度で動くなら、文字起こしを介さずに 音声そのもの を検索対象に加えられる可能性があります。 また、音声データには機密情報が含まれることが多く、外部 API には投げにくいケースが少なくありません。Elastic 内で inference・indexing・検索・アクセス制御を統合できれば、データを外に出さずに音声検索基盤を構築できる点も大きなメリットです。 システム構成 今回の構成はシンプルです。 音声ファイル(.wav) │ Base64 エンコード ▼ Elastic Inference Service (.jina-embeddings-v5-omni-small) │ 1024 次元 embedding ▼ Elasticsearch (dense_vector field) │ kNN 検索 ▼ 類似音声 / 類似テキスト クライアント側(今回はローカル Mac の Python スクリプト)では、音声ファイルを Base64 化して EIS に送るだけです。embedding 生成自体は EIS 側で実行されるため、GPU やモデル管理をローカルに持つ必要はありません。 インデックスの mapping embedding を保存するインデックスの mapping は次のとおりです。 PUT audio-poc-jina-eis-v1 { "mappings": { "properties": { "audio_id": { "type": "keyword" }, "file_name": { "type": "keyword" }, "expected_topic": { "type": "keyword" }, "audio_url": { "type": "keyword" }, "embedding": { "type": "dense_vector", "dims": 1024, "index": true, "similarity": "cosine" }, "created_at": { "type": "date" }, "embedding_method": { "type": "keyword" } } } } 設計のポイントは以下です。 dense_vector の dims: 1024 は Jina v5 Omni small の出力次元に合わせる index: true を指定して kNN 検索の対象にする 類似度関数は cosine を選択。ベクトルの長さよりも方向で比較するため、テキスト/音声 embedding では一般的な選択 検証データ 検証用に、短い日本語の問い合わせ音声を5つ用意しました。 ファイル 想定トピック 話している内容 1.wav ログインできない 昨日から何度もログインしようとしていますが、正しいメールアドレスとパスワードを入力してもアカウントに入れません。 2.wav パスワード再設定 パスワードを忘れてしまったので再設定メールを送ったのですが、メールが届かず手続きが進められません。 3.wav クレジットカード決済エラー クレジットカードで支払いをしようとすると毎回エラーが出て、注文を完了できない状態です。 4.wav サービス画面の フリーズ サービスの画面が途中で固まってしまい、急ぎの作業ができないのでとても困っています。 5.wav 契約プランと 請求金額の確認 契約内容について確認したいことがあるので、現在のプランと次回の請求金額を教えてください。 すべて 同じ話者・同じ録音条件 で作成しています。 この条件が、後段で検索結果を解釈するうえで重要 になります。 音声を embedding 化して保存する 本記事のコードは GitHub で公開しています: https://github.com/SIOS-Technology-Inc/elastic-blogs/tree/main/2026-05-15-test-jina-audio ここは PoC の中核なので、少し丁寧にやったことの流れを追います。 Step 1: 音声を Base64 に変換する .wav は バイナリファイル です。HTTP の JSON ボディには通常バイナリをそのまま載せられないので、まず「テキスト」に変換する必要があります。そのために使うのが Base64 です。Base64 は、任意のバイナリデータを ASCII 文字列で表現する方式で、画像や音声を API に渡すときの定番テクニックです。 def audio_to_base64(file_path: Path) -> str: with open(file_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") 1.wav を変換すると、こんな長い文字列になります。 UklGRiQAAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQAA... これで、音声を JSON に乗せられる形になりました。 Step 2: EIS に送って embedding を生成してもらう 次に、Base64 化した音声を Elasticsearch の inference API に送ります。EIS で公開されている embedding モデルは、以下の統一エンドポイントで呼び出します。 POST /_inference/embedding/{INFERENCE_ID} {INFERENCE_ID} には、今回は .jina-embeddings-v5-omni-small を指定します。リクエストボディは1行だけです。 { "input": ["data:audio/wav;base64,UklGRiQAAABXQVZF..."] } ここでひとつ重要なのが、Base64 文字列の前についている data:audio/wav;base64, という接頭辞です。これは データ URI と呼ばれる形式で、「この入力は wav 音声データだよ」とモデルに伝える役割を持ちます。同じエンドポイントはテキストも受け付けるので、入力がテキストなのか音声なのかをこの接頭辞で判別しています。 Python で書くとこうなります。 def get_audio_embedding(file_path: Path) -> list: audio_b64 = audio_to_base64(file_path) # data URI プレフィックスで「これは音声」だとモデルに伝える audio_input = f"data:audio/wav;base64,{audio_b64}" response = es.perform_request( "POST", f"/_inference/embedding/{INFERENCE_ID}", headers={ "Accept": "application/json", "Content-Type": "application/json", }, body={"input": [audio_input]}, ) return response["embeddings"][0]["embedding"] 実装で唯一ハマったポイントは、 Acceptと Content-Typeの両方を指定する必要があった ことです。片方しか指定しないと互換バージョンと衝突して 400 エラーになることがありました。気づくまで少し時間を使ったので、同じことを試す方は注意してください。 Step 3: 返ってきた embedding を受け取る EIS が返してくるレスポンスは次のような形です。 { "embeddings": [ { "embedding": [0.012, -0.028, 0.103, -0.057, /* ...計1024個の数値... */] } ] } embeddings[0].embedding を取り出すと、 長さ 1024 の float の配列 が手に入ります。これがその音声の「特徴を数値化したもの」です。人間には意味の分からない数値の羅列ですが、Elasticsearch にとってはこの 1024 個の数字が「似ているか」を判定する材料になります。 Step 4: メタデータと一緒に Elasticsearch に保存する 最後に、embedding をメタデータと組み合わせて、1つの ドキュメント として Elasticsearch に保存します。 indexed_doc = { ... } es.index( ... ) これで、audio-poc-jina-eis-v1 インデックスには次のような JSON ドキュメントが保存されます。 { "audio_id": "audio-001", "file_name": "1.wav", "expected_topic": "ログインできない", "embedding": [0.012, -0.028, 0.103, /* ...計1024個... */], "embedding_method": "elastic_inference_jina_omni_small", "created_at": "2026-..." } 5 つの音声に対してこの処理を繰り返すと、Elasticsearch には 5 件の音声 embedding が並びます。あとは kNN クエリで「近いベクトル」を探すだけ、という状態になりました。 検証1: audio-to-audio 検索 最初に、音声 → 音声の類似検索を試しました。1.wav を query として、最も近い音声を探します。 検索 body はシンプルな kNN クエリです。 { "knn": { "field": "embedding", "query_vector": [/* query 音声の 1024 次元 embedding */], "k": 5, "num_candidates": 10 }, "_source": ["audio_id", "file_name", "expected_topic"] } 結果 1.wav を query にした場合: 1位: 1.wav score: 0.99999 2位: 3.wav score: 0.99142 3位: 5.wav score: 0.99125 4位: 4.wav score: 0.98703 5位: 2.wav score: 0.98342 次、5.wav を query にした場合: 1位: 5.wav score: 1.0000001 ※ ※ cosine 類似度ベースの kNN スコアは (1 + cos) / 2 で計算されるため理論上の上限は 1.0 です。query ベクトルとインデックス側ベクトルが同一の場合に、浮動小数点演算の丸め誤差で形式上 1.0 をわずかに超えた値が返ることがあります。実質 1.0 と読み替えてください。 期待どおり、query にした音声自身が 1 位になりました。 音声 → 音声の類似検索は正しく動作している と判断できます。 ただし注目すべきは 2 位以下のスコアです。すべて 0.98〜0.99 という非常に高い値に集中しています。1 位は明確に分離できるものの、それ以外は ほぼ団子状態 です。この観察は、次の text-to-audio の議論の伏線になります。 検証2: text-to-audio 検索 次に、日本語テキスト query から音声を探します。クエリは同じく kNN ですが、query_vector を「テキストから生成した embedding」に差し替えます。クライアント側のコードはほぼ同じで、入力をテキストに切り替えるだけです。 結果 query: ログインできない (期待: 1.wav が1位) 1位: 5.wav (契約プランと請求金額の確認) score: 0.54343 2位: 1.wav (ログインできない) score: 0.54301 3位: 3.wav (クレジットカード決済エラー) score: 0.54214 少し長めの query でも試しました。 query: 正しいメールアドレスとパスワードを入力してもアカウントに入れません 1位: 5.wav 2位: 1.wav 期待した 1.wav は 1 位になりませんでした。さらに、上位 3 件のスコアが 0.543 前後に密集しており、意味的な分離がほとんど効いていないことが分かります。 つまり、 今回の条件では text-to-audio 検索の精度は実用レベルに届かなかった ということです。 なぜ text-to-audio は弱かったのか この PoC で最も重要なメッセージは、ここにあります。「マルチモーダルであれば何でも検索可能」というわけではない、という現実を共有できるかもしれません。 考えられる要因は複数あります。 1. 検証データが「音響的に似すぎている」 今回の 5 つの音声は、すべて以下の条件で作られています。 同じ話者 同じ口調 同じ録音条件(マイク、室内環境、無音区間の入り方) 同じくらいの長さ(10〜15 秒) 同じ「問い合わせ口調」の文体 audio embedding は、内容(何を話しているか)だけでなく、 話者の声質・録音条件・話すトーン・無音区間 といった音響的な特徴も拾います。今回のように音響条件が似すぎたデータでは、内容の違いより「音声としての雰囲気の共通性」が embedding を支配しやすくなります。audio-to-audio 検索で 2 位以下のスコアが 0.98 台に集中していたのは、まさにこの影響と整合します。 2. テキストと音声の意味空間が完全には揃っていない マルチモーダル embedding は「同じ意味のテキストと音声を近づける」ように学習されていますが、その整合性の強さは学習データやモデルサイズに依存します。 今回使ったのは omni-small(軽量モデル) です。日本語の短い問い合わせ音声で text-to-audio の対応関係を十分捉えられるかは、未知数の領域です。スコア差が 0.001 オーダーしかなくノイズに埋もれている状態は、まさにこの「整合性が弱い」状態と読めます。 3. 音声が短く、意味的な信号が少ない 各音声は 10〜15 秒程度です。短い音声ほど、テキスト query との意味的なマッチングに使える信号は少なくなります。数十秒〜数分の音声であれば、もう少し違う結果になった可能性があります。 まとめ audio-to-audio が動いて text-to-audio が弱かったのは「実装ミス」ではなく、 マルチモーダル embedding の現実的な特性と、検証データの条件が組み合わさった結果 だと考えています。 ここから引き出せる教訓はシンプルです。 「ベクトル検索の品質は、モデルだけでなく、データの性質と用途の組み合わせで決まる」 。これはマルチモーダルに限らず、テキスト embedding でも同じことが言えます。 改善方針: transcript と組み合わせた Hybrid Retrieval 実務で音声検索を本気で組むなら、音声 embedding 単体で頑張るより、 文字起こし(transcript)を併用する ほうが圧倒的に現実的です。 具体的にはこんな構成を考えています。 音声ファイル ├─ audio embedding (audio-to-audio 検索用) ├─ transcript text (BM25 / lexical 検索用) ├─ transcript embedding (semantic 検索用) └─ メタデータ (時間、顧客ID、カテゴリなど) │ ▼ Elasticsearch │ ▼ Hybrid Retrieval (RRF などで結果を統合) Elasticsearch には、複数の retriever の結果を統合する仕組みとして Reciprocal Rank Fusion (RRF) があります。これを使うと、以下を 1 リクエストで束ねられます。 音声 embedding による kNN(似た音声を探す) transcript embedding による kNN(意味の近い発話を探す) transcript への BM25(キーワード検索) メタデータでの filter(期間、顧客カテゴリなど) ユーザーが「ログインできない」と検索したとき、文字起こしテキスト側がしっかり 1.wav にマッチしてくれるはずです。音声 embedding はその上で「似た声質・トーンの問い合わせ」を補強する役割で使う、という役割分担が現実的です。 次のステップとして、この hybrid 構成を実装し、再度同じ query で評価してみる予定です。 まとめ 今回の PoC で確認できたことは次のとおりです。 EIS 経由で音声 embedding を生成できる — .jina-embeddings-v5-omni-small に Base64 音声を送ると 1024 次元の embedding が返る dense_vector に保存して kNN 検索できる — マルチモーダル検索の土台は問題なく動く audio-to-audio 検索は実用的に機能する — 同じ音声を query にすればその音声が 1 位に返る text-to-audio 検索は今回の条件では弱かった — 短く・音響条件が似た音声群では、テキスト query との意味的な分離が困難 ここから得た一番の学びは、 「マルチモーダル embedding は万能ではなく、ユースケースに合わせて他の検索手段と組み合わせて使うもの」 という現実です。 参考資料 Elastic Search Labs: Jina Embeddings v5 Omni — all media, one index https://www.elastic.co/search-labs/jp/blog/jina-embeddings-v5-omni-all-media-one-index Jina AI: jina-embeddings-v5-omni-small model card https://jina.ai/models/jina-embeddings-v5-omni-small/ Elasticsearch: dense_vector field type / kNN search / Reciprocal Rank Fusion (RRF) https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html Elastic Inference Service https://www.elastic.co/docs/explore-analyze/elastic-inference/eis The post Elastic Inference Service と Jina Embeddings v5 Omni で音声を検索する first appeared on Elastic Portal .
1. はじめに Amazon Connect Customer は音声/ビデオとチャットを個別のチャネルとしてサポートしており、それぞれ独自の API を備えています。ネイティブウィジェットやカスタムウィジェットを使う場合、各チャネルは独立して動作します。一般的なコンタクトセンターのシナリオではこれで十分です。 しかし、顧客とエージェントのやり取りが通話だけでは済まない場合はどうでしょうか? たとえば、顧客がローン申請の最終手続きのために電話をかけてきたとします。エージェントは事前承認を確認しますが、顧客は書類を確認して署名する必要がありますが、郵送された書類はまだ届いていない状況です。エージェントは顧客に電話を切って郵便を待つよう伝えるか、別でチャットを開始するしかありません。複数のやり取り、異なるエージェント、場合によっては数日の遅延が発生します。 もし通話中にエージェントが書類を送信できたらどうでしょうか? 顧客はその場で署名して返送できます。同じエージェント、同じやり取りで済ませられ、数日ではなく数分で完了することができるでしょう。 本記事ではこのような課題を扱います。音声/ビデオとチャットを統合し、シームレスな顧客体験を実現するソリューションを紹介します。 1.1. Amazon Connect Customer で実現できる理由 根本的な課題はシンプルです。ライブ通話中に顧客がエージェントとテキストメッセージやファイルをやり取りするにはどうすればよいか、ということです。 Amazon Connect Customer は必要な API とツールを提供しています。StartWebRTCContact API で音声・ビデオ通話を開始し、DescribeContact API でエージェントが応答した後のエージェント ID を取得できます。コンタクトフローは特定のエージェントにルーティングするための属性をサポートしており、チャットウィジェットは初期化時にコンタクト属性を受け取れるため、チャット開始時にアプリケーションからエージェント ID を渡せます。 これらの機能はいずれも新しいものではありません。新しいのは、それらを組み合わせる方法です。カスタム UI がアクティブな通話からエージェント ID を抽出し、チャットのルーティングロジックに渡します。チャット中も顧客は同じエージェントに接続されたままで、切断や別のキューでの待機は不要です。 1.2. ビジネス価値 1 回の顧客・エージェント間のやり取りでチャネルを統合することで、具体的な効果が得られます。 顧客にとって: コールバックや転送、別のキューでの待機が不要になります。先ほどのローンの顧客は、数日ではなく数分で申請を完了できます。 運用面: エージェントが顧客対応をエンドツーエンドで完結できます。重複作業、引き継ぎの手間、フォローアップタスクによるエージェントの負荷がなくなります。 コンプライアンス面: すべての音声・チャットのやり取りが 1 人のエージェント、1 人の顧客、1 つのケースに紐づきます。規制の厳しい業界では、チャネルをまたいだコンタクトレコードの紐づけにより監査が容易になります。 2. ソリューションのアーキテクチャ このソリューションは、カスタムフロントエンドを 3 つのレイヤー (ホスティングと配信、認証と認可、リアルタイム通信) で AWS サービスに接続します。 2.1. 概要 以下の手順は、図の番号付きラベルに対応しています。 ステップ 1 — 認証。顧客がユーザーインターフェースにログインします。フロントエンドが認証情報を Amazon Cognito ユーザープール に送信し、検証後に ID トークンが返されます。 ステップ 2 — 認可。フロントエンドが ID トークンを Amazon Cognito アイデンティティプール に渡し、 AWS STS の AssumeRoleWithWebIdentity を呼び出します。IAM ロールが Amazon Connect に対する最小権限を付与し、一時的な認証情報がフロントエンドに返されます。これは重要な設計上のポイントです。フロントエンドは長期間有効なシークレットを保持せず、すべての認証情報はスコープが限定され、短期間で失効します。 ステップ 3 — 音声・ビデオ通話。顧客が通話を開始します。フロントエンドは一時的な認証情報を使って StartWebRTCContact API を呼び出し、WebRTCQueueRouting コンタクトフローをトリガーします。このフローが対応可能なエージェントに通話を割り当て、 Amazon Chime SDK のミーティング設定を返します。フロントエンドは Chime SDK セッションを初期化し、リアルタイムの音声・ビデオストリームを管理します。同時に、フロントエンドは DescribeContact API を呼び出してアクティブなコンタクトからエージェント ID を取得し、ローカルに保存します。 ステップ 4 — 同じエージェントとのチャット。顧客がチャットを開くと、フロントエンドは保存済みのエージェント ID を Amazon Connect Customer チャットウィジェット に渡します。チャットウィジェットは Amazon Connect Customer のホストエンドポイントから読み込まれます。チャットコンタクトが ChatAgentRouting コンタクトフローをトリガーし、エージェント ID を使って通話中の同じエージェントに直接ルーティングします。このステップで、音声/ビデオとチャットが 1 人のエージェントに集約されます。やり取りが終了すると、フロントエンドは StopContact と DisconnectParticipant API を呼び出してセッションを適切に終了します。 2.2. フロントエンドコンポーネント ユーザーインターフェースは 5 つのコンポーネントで構成され、それぞれ異なる役割を担います。 Authentication State Manager は、Cognito ユーザープールのフローを通じてログインを処理し、ID トークンを生成します。 Credential Manager は、そのトークンを Amazon Connect Customer API にスコープされた一時的な AWS 認証情報と交換します。 Session Manager は通話のセッション全体を管理します。暗号化されたセッションコンテキストをローカルストレージに保存し、通話の開始を制御し、チャットウィジェットのルーティング先となるエージェント ID を取得します。 WebRTC Manager はリアルタイムメディアを管理します。StartWebRTCContact の呼び出し、Chime SDK セッションの初期化、音声/ビデオストリームの管理を行います。 Chat Widget は Amazon Connect Customer のホストエンドポイントから読み込まれます。Session Manager からエージェント ID を受け取り、チャットを同じエージェントにルーティングします。コンタクトの作成、WebSocket 接続、メッセージング、ファイル添付など、チャットのライフサイクル全体を処理します。 2.3. バックエンドサービス バックエンドは 3 つのレイヤーの AWS サービスで構成されています。 ホスティングと配信: Amazon CloudFront がセキュリティヘッダーとキャッシュを使って UI をグローバルに配信します。Amazon S3 に静的アセットを保存します。 認証と認可: Amazon Cognito ユーザープール、Amazon Cognito アイデンティティプール、AWS STS、IAM が連携します。フロントエンドには必要最小限の権限のみが付与されます。 コミュニケーション: リアルタイムのやり取りが行われるレイヤーです。StartWebRTCContact API が WebRTCQueueRouting フローをトリガーしてエージェントを割り当てます。API は Amazon Chime SDK の設定を返し、フロントエンドがリアルタイムの音声・ビデオを確立します。DescribeContact API でエージェント ID を取得し、ChatAgentRouting フローがチャットを同じエージェントにルーティングします。StopContact と DisconnectParticipant API でセッションをクリーンアップします。 3. 前提条件 デプロイには、以下の準備が必要です。 AWS アカウント ファイル添付が有効化 された Amazon Connect Customer インスタンス AWS CDK v2 のインストールと設定 Node.js v20.x 以降 適切な権限で設定された AWS CLI 4. ソリューションのデプロイとクリーンアップ ソリューション全体は AWS CDK アプリケーションとして GitHub リポジトリ にパッケージ化されています。このスタックは CloudFront、S3、Cognito、IAM ロール、Amazon Connect Customer コンタクトフローなど、すべてをプロビジョニングします。 README にリポジトリのクローン、依存関係のインストール、Connect インスタンスの設定、スタックのデプロイ、テストユーザーの作成、統合体験の検証まで、各手順が記載されています。 以下のステップバイステップのデプロイ手順に沿って進めてください。テストが完了したら、不要な課金を避けるためにすべてのリソースを削除してください。 5. まとめと次のステップ 本記事では、1 回の顧客・エージェント間のやり取りで、Amazon Connect の 1 人のエージェントを通じて音声/ビデオとチャットを統合する方法を紹介しました。この方法は StartWebRTCContact と DescribeContact API、コンタクトフローのルーティング、Amazon Chime SDK、標準のチャットウィジェット、Amazon Cognito と AWS STS による短期間有効な AWS 認証情報など、既存の機能を組み合わせたソリューションです。 これは 1 つのアプローチにすぎません。完全な柔軟性を求める場合は、音声/ビデオとチャットのカスタムウィジェットをゼロから構築することもできます。Amazon Connect API ( StartChatContact でチャットを開始、 CreateParticipantConnection で WebSocket 接続を確立、 SendMessage でメッセージング、 StartAttachmentUpload と CompleteAttachmentUpload でファイル共有) を使えば、すべてのインタラクションをきめ細かく制御できますが、実装の複雑さが増します。可能な限り Amazon Connect Customer の組み込み機能を活用し、必要な部分だけカスタマイズすることをお勧めします。 まず、書類への署名、ビジュアルトラブルシューティング、フォーム送信など、顧客がタスクを完了するために複数のタッチポイントを必要とするケースを特定しましょう。 GitHub リポジトリ からソリューションをデプロイし、実際に動作を確認してみてください。その後、1 つのチームと 1 つのユースケースでパイロットを実施し、導入前後の対応時間、初回解決率、顧客の手間の変化を測定しましょう。そのデータをもとに、ソリューションが自社のカスタマーサービス運用に適しているかを評価しましょう。 6. 関連資料 ソリューションの GitHub リポジトリ Amazon Connect Customer 管理者ガイド: アプリ内、ウェブ、ビデオ通話、画面共有機能のセットアップ Amazon Connect Customer StartWebRTCContact API Amazon Connect Customer DescribeContact API Amazon Chime SDK for JavaScript Amazon Cognito デベロッパーガイド Amazon Connect Customer 管理者ガイド: ウェブサイトにチャットユーザーインターフェースを追加する 著者について Ying Qian は、コンタクトセンター技術分野で 19 年以上の経験を持ち、ソリューションアーキテクト、テクニカルプロジェクトマネージャー、ICT リードエンジニア、オペレーションエンジニアなどの経験があります。AWS ではサービス担当ソリューションアーキテクトとして Amazon Connect Telephony & Resiliency SME チームをリードし、AWS Well-Architected Framework の原則に沿った Amazon Connect の導入を支援しています。仕事以外ではジョギング、家族とのアルプスハイキング、ボーデン湖での水泳を楽しんでいます。 Nelson Martinez はシドニーを拠点とする Applied AI シニアソリューションアーキテクトです。コンタクトセンター、ユニファイドコミュニケーション、IP テレフォニー、ネットワーキングの分野でオーストラリアと米国にまたがり 31 年以上の経験を持ちます。AWS で 5 年以上にわたり、クラウドコンタクトセンターと Applied AI ソリューションを専門とし、グローバル規模で業界をリードする実装を直接お客様と進めています。 翻訳はテクニカルアカウントマネージャーの高橋が担当しました。原文は こちら です。
本記事は 2026 年 5 月 7 日 に公開された「 Migrating data from an Amazon Aurora snapshot into Amazon Aurora DSQL 」を翻訳したものです。 Amazon Aurora DSQL は、高可用性、無制限のスケール、マルチリージョンの強整合性、そしてインフラ管理不要を実現したサーバーレス分散 SQL データベースです。Aurora DSQL はデータベースへのデータ移行に PostgreSQL の COPY コマンドをサポートしており、Aurora DSQL 向けに COPY コマンドを利用しやすくした dataloader スクリプトも提供しています。ただしこの方式は、テーブルを 1 つずつ移行する必要があるうえ、ソースデータベースから移行先 Aurora DSQL クラスターへデータをコピーする中継用コンピュートインスタンスを別途用意しなければならず、ソースとターゲット間でデータを変換する手段も用意されていません。大規模なデータ移行や、データ型変換、スキーマ変更、その他の変換が必要な移行には、マネージドな移行手法のほうが適しています。本記事では AWS Glue を使って Amazon Aurora のスナップショットから Aurora DSQL クラスターへデータを移行する方法を紹介します。 ソリューション概要 AWS Glue は、Extract, Transform, Load (ETL) 処理を行う Apache Spark ジョブ向けにマネージドな並列実行環境を提供するデータ統合サービスです。AWS Glue では、移行に必要なデータ変換を PySpark スクリプトとして記述でき、移行に利用するコンピュートノードの数とキャパシティを指定して実行できます。AWS Glue が背後のコンピュートインフラを管理し、コンピュートノード間で処理の分散と並列実行をオーケストレーションします。 本記事では、Amazon Aurora PostgreSQL-Compatible Edition から Aurora DSQL へ、データベーススナップショットと AWS Glue を使って 2 つのテーブルを持つデータベースを移行する例で、この移行手法を紹介します。今回の移行のワークフローは次の図のとおりです。 移行ワークフローは次のとおりです。 Aurora PostgreSQL クラスターのスナップショットを作成します。 Aurora スナップショットの S3 へのエクスポート機能 を使い、スナップショットから Amazon Simple Storage Service (Amazon S3) バケットへ Parquet 形式でデータを抽出します。 AWS Glue クローラーを作成・実行して S3 上の Parquet ファイルを発見し、スキーマを判定し、スキーマとファイルの場所を AWS Glue Data Catalog に記録します。ソースデータベースのテーブルごとに 1 つのクローラーを作成します。 Data Catalog を参照して S3 からファイルを見つけて読み込み、必要なデータ変換を行ったうえで Aurora DSQL に書き込む PySpark ETL ジョブを AWS Glue で作成します。 ETL ジョブを実行し、ワンタイムのデータロードを行います。 Aurora DSQL は 1 クラスターあたり 1 データベースのみをサポートしますが、Aurora クラスターは複数のデータベースをホストできます。複数のデータベースをホストする Aurora クラスターを移行するには、Aurora スナップショットに含まれるデータベースごとにこの移行プロセスを繰り返し、データベースをそれぞれ独立した Aurora DSQL クラスターにデプロイするか、Aurora DSQL の 1 つのデータベース内に複数のスキーマとして移行する必要があります。 データ型変換 Aurora スナップショットの S3 エクスポート処理では、Aurora DSQL への最終的な書き込みに影響するデータ変換が行われます。一部の変換は修正や再変換が必要になる場合があり、その作業は AWS Glue ETL ジョブ内の PySpark で行えます。たとえば、ソースデータベースの timestamp 型カラムは、スナップショットエクスポート時に Parquet のバイト配列に変換され 、PySpark で読み込んだ際には文字列オブジェクトとして解釈されます。Aurora DSQL に書き込む前に、この文字列を timestamp 型へ戻す必要があります。スナップショットエクスポート処理が PostgreSQL のデータ型を Parquet にどう変換するかは、 エクスポート関連のドキュメント を参照してください。 Aurora DSQL はオープンソースの PostgreSQL で利用できる全データ型をサポートしているわけではありません。Aurora DSQL がサポートするデータ型の一覧は Aurora DSQL ユーザーガイド を参照してください。ソースデータベースのテーブルで Aurora DSQL がサポートしないデータ型を使っているカラムを特定し、Aurora DSQL ではどう表現するかを決め、AWS Glue PySpark ジョブで変換を行う必要があります。該当するカラムはスナップショットエクスポート時にも変換される場合があるため、PySpark スクリプトでその点も考慮しなければなりません。 主キーの扱い 多くのアプリケーションは主キーに連番の整数を使います。新しいデータに一意の識別子を自動付与でき、ほとんどのリレーショナルデータベースでは新しい行がストレージ上で近い位置に配置されるため、最近追加された行が他の新しい近接行の読み取りでバッファキャッシュに乗りやすくなります。アプリケーションでは古いデータより新しいデータのほうが頻繁に読まれる傾向があり、ストレージからの読み取りよりキャッシュからの読み取りのほうがはるかに高速なため、連番識別子は多くのアプリケーションで読み取り性能の向上につながります。 ただし Aurora DSQL はバッファキャッシュを提供しておらず、大規模に連番整数キーを使うとホットなストレージパーティションが発生する可能性があります。Aurora DSQL のレンジパーティショニングでは新しいデータがすべて同じストレージパーティションに配置されるためです。代わりに、レンジパーティション化されたストレージで分散が良くなる主キーを選びましょう。テーブル内に既に存在するカラムの中から、カーディナリティの高いカラムに他の 1 つ以上のカラムを続けた複合キーを選ぶことを推奨します。この種のキーはデータへのアクセス方法と一致しやすく、追加のセカンダリインデックスを必要としません。また、テーブル定義のみを変更しテーブル内のデータは変えないため、移行時に追加のデータ変換も必要ありません。 ただし、これが常に可能とは限りません。場合によっては、UUID (Universally Unique Identifier) かランダム化された識別子で旧主キーを置き換える新しい主キーカラムを作成する必要があります。移行時に主キーを変換するのは難しく、外部キー関係も修正しなければなりません。移行検証のためにソースデータベースとターゲットデータベース間で行をマップできるよう、元の識別子を別カラムに残しておきたい場合もあります。データベースを利用するすべてのアプリケーションも新しい識別子を使うように更新する必要があります。 本記事の例では主キーを UUID に変換し、外部キー関係を修正することで、AWS Glue と PySpark を使った主キー変換の進め方を示します。元の主キーは別カラムに残し、ソースデータベースへマップし直せるようにします。 移行手順 本セクションでは、架空の小売アプリケーション向けデータベースを Aurora PostgreSQL から Aurora DSQL へ移行する手順を順を追って説明します。本記事では、例を交えて移行プロセスを示し、実際の移行に応用していただくことを目的としています。 Aurora PostgreSQL のソースデータベース名は「storefront」です。storefront には次のテーブルを持つ sales スキーマが含まれています。 CREATE SCHEMA sales; CREATE TABLE sales.customers ( id integer NOT NULL, username character varying(50), first_name character varying(50), last_name character varying(50) ); CREATE TABLE sales.orders ( id integer NOT NULL, customer_id integer, order_date date, order_timestamp timestamp without time zone, product_details jsonb, quantity integer, unit_cost numeric(6,2), unit_weight real ); これらのテーブルは実際の小売データベースとしては必ずしも適切ではありませんが、Aurora DSQL でどう変換されるかを示すためにさまざまなデータ型のカラムを持たせています。 sales.orders テーブルの customer_id カラムは sales.customers テーブルの id カラムを参照しています。簡潔にするためインデックスや制約は省略しています。 前提条件 この例の移行は本番環境で実施しないでください。実行には、AWS アカウントと、移行に必要なリソース (AWS Identity and Access Management (IAM) のロールやアクセス許可を含む) を作成できる十分な権限が必要です。例のソリューションを自分の移行に応用する場合は、本番データを移行する前に必ず非本番アカウントで作業し、十分にテストしてください。 また、ワークステーションのローカルか、AWS 環境で動作するコンピュートインスタンス上で動作する Unix bash シェルセッションへのアクセスも必要です。ワークステーションは AWS アカウントおよびソース/ターゲットデータベースへのネットワークアクセスを持ち、最近のバージョンの AWS Command Line Interface (AWS CLI) がインストールされている必要があります。AWS CLI は前述の IAM ロールで 設定されている必要があります 。 AWS 環境でのデータベース操作、Unix シェル、AWS マネジメントコンソールや AWS CloudFormation などの AWS サービスについて中級程度の経験があることを前提としています。そのため、ソースデータベースの構築、データベースへの接続、コマンドを実行するワークステーションのセットアップについての具体的な手順は示しません。 本移行には Amazon S3 バケット、AWS Glue クローラー、AWS Glue ジョブ、 AWS Key Management Service (AWS KMS) キー、エンドツーエンドのワークフローを実現するためのアクセス許可を付与する複数の IAM ポリシーとロールが必要です。利便性のため、これらのコンポーネントは AWS CloudFormation テンプレートでデプロイし、移行に使う PySpark コードもあわせてデプロイします。CloudFormation テンプレートと関連ファイルは GitHub リポジトリで公開しており、ワークステーションにダウンロードしてください。次のコマンドでプロジェクトファイルをダウンロードします。 git clone git@github.com:aws-samples/sample-migrate-to-dsql.git リポジトリには amazon-aurora-snapshots-to-dsql というサブディレクトリがあり、以下のファイルが含まれています。本記事ではこのディレクトリを「プロジェクトディレクトリ」と呼びます。 プロジェクトに含まれるファイルは次のとおりです。 stack.yml 後続のセクションで参照する S3 バケット、AWS KMS キー、AWS Glue クローラー、AWS Glue ジョブ、関連する IAM ポリシーおよびロールを作成します。 ddl-dsql.sql ターゲットの Aurora DSQL クラスターに必要なスキーマ、テーブル、インデックスを作成する SQL コマンドが含まれています。 storefront.sql.zip ソースデータベースのスキーマとテーブルを作成し、サンプルデータをテーブルに投入する SQL コマンドが含まれています。 AWS アカウント内の Amazon Virtual Private Cloud (Amazon VPC) のプライベートサブネットに、 新しい Aurora PostgreSQL クラスターを作成します 。本例では PostgreSQL 17.7 を使用しています。クラスター名は「prod-cluster」とし、 初期データベース名 には「storefront」を指定します。クラスター作成時に初期データベース名を設定し忘れた場合は、クラスターにログインし、次の SQL 文でデータベースを作成します。 CREATE DATABASE storefront; プロジェクトディレクトリの bash シェルで次のコマンドを実行し、データベーススキーマを作成してサンプルデータをロードします。 unzip storefront.sql.zip psql -h <<your cluster's hostname>> -U postgres -f storefront.sql -d storefront データベースクラスターの「postgres」ユーザーのパスワード入力を求められます。データのロード後、storefront データベースにログインし、次の SQL コマンドでサンプルデータを確認します。 select * from sales.customers limit 20; select * from sales.orders limit 20; これでソースデータベースの準備ができました。Aurora DSQL へのデータ移行に進みます。 移行の実行 まず Aurora DSQL クラスターを作成します。Unix 系のコマンドラインで次のコマンドを実行し、「storefront」という名前の単一リージョン Aurora DSQL クラスターを作成し、エンドポイントと Amazon Resource Name (ARN) を新しい環境変数に保存します。クラスター作成は数秒で完了します。 export DSQL_CLUSTER_ID=($(aws dsql create-cluster --no-deletion-protection-enabled --tags Name=storefront --output text --query 'identifier')) export DSQL_ENDPOINT=$(aws dsql get-cluster --identifier $DSQL_CLUSTER_ID --output text --query 'endpoint') export DSQL_CLUSTER_ARN=$(aws dsql get-cluster --identifier $DSQL_CLUSTER_ID --output text --query 'arn') クラスターのステータスは次のコマンドで確認します。 aws dsql get-cluster --identifier $DSQL_CLUSTER_ID \ --output text --query 'status' ステータスが「ACTIVE」になるまで何度か実行します。Aurora DSQL クラスターがアクティブになったら、 クラスターに接続 し、GitHub プロジェクトの ddl-dsql.sql ファイルにあるコマンドを実行してスキーマとテーブルを作成します。 次に、先ほどの GitHub プロジェクトの stack.yml テンプレートファイルを使い、「apg-to-dsql」という名前の AWS CloudFormation スタックを作成します。スタック名は後で参照するため重要です。スタックには次のパラメータが必要です。 DSQLClusterEndpoint 作成した Aurora DSQL クラスターのエンドポイント。AWS マネジメントコンソールのクラスター詳細ページで確認できます。 DSQLClusterArn 作成した Aurora DSQL クラスターの ARN。AWS マネジメントコンソールのクラスター詳細ページで確認できます。 LoaderJobCapacity DSQL ローダージョブに割り当て可能な AWS Glue data processing unit (DPU) の最大キャパシティで、2〜100 の範囲で指定します。DPU は処理能力の相対的な指標で、4 vCPU と 16 GB メモリで構成されます。移行のサイズと複雑さに応じて DPU 数を選びます。 ExportJobName スナップショットエクスポートジョブの名前。アカウント内で一意である必要があります。 SourceDatabaseName Aurora PostgreSQL のソースデータベース名。 SourceSchemaName ソースデータベースから移行するスキーマ名。 サンプル移行のデフォルト値でスタックを作成するには、プロジェクトディレクトリで次のコマンドを実行します。 aws cloudformation create-stack --stack-name apg-to-dsql \ --template-body file://stack.yml --capabilities CAPABILITY_NAMED_IAM \ --parameters ParameterKey=DSQLClusterEndpoint,ParameterValue=$DSQL_ENDPOINT ParameterKey=DSQLClusterArn,ParameterValue=$DSQL_CLUSTER_ARN aws cloudformation wait stack-create-complete --stack-name apg-to-dsql スタックの完了を待ちます。後続の手順で必要となる出力値が複数あります。 KmsKeyArn エクスポートしたスナップショットデータを暗号化するために作成された AWS KMS キーの ARN。 SnapshotExportRoleArn スナップショットエクスポート処理が S3 にデータを保存するために必要な IAM ロールの ARN。 GlueRoleName AWS Glue ジョブが必要なアクセスを得るための IAM ロール名。 GlueRoleArn AWS Glue ジョブが必要なアクセスを得るための IAM ロールの ARN。 GlueJobName AWS Glue ジョブの名前。 後続のコマンドで使いやすいように、スタックパラメータと出力値を環境変数に取り込みます。スタック名を「apg-to-dsql」以外にした場合は、コマンドを設定したスタック名に合わせて修正してください。 export EXPORT_JOB_NAME=$(aws cloudformation describe-stacks --stack-name apg-to-dsql --query 'Stacks[0].Parameters[?ParameterKey==`ExportJobName`].ParameterValue' --output text) export BUCKET_NAME=$(aws cloudformation describe-stacks --stack-name apg-to-dsql --query 'Stacks[0].Outputs[?OutputKey==`BucketName`].OutputValue' --output text) export DATABASE_NAME=$(aws cloudformation describe-stacks --stack-name apg-to-dsql --query 'Stacks[0].Parameters[?ParameterKey==`SourceDatabaseName`].ParameterValue' --output text) export EXPORT_ROLE_ARN=$(aws cloudformation describe-stacks --stack-name apg-to-dsql --query 'Stacks[0].Outputs[?OutputKey==`SnapshotExportRoleArn`].OutputValue' --output text) export KMS_KEY_ARN=$(aws cloudformation describe-stacks --stack-name apg-to-dsql --query 'Stacks[0].Outputs[?OutputKey==` KmsKeyArn`].OutputValue' --output text) export GLUE_JOB_NAME=$(aws cloudformation describe-stacks --stack-name apg-to-dsql --query 'Stacks[0].Outputs[?OutputKey==`GlueJobName`].OutputValue' --output text) 次のコマンドでデータベーススナップショットを作成します。スナップショット名は「migrate-to-dsql」、ソースデータベースクラスター名は前述の「prod-cluster」です。スナップショットの ARN は後で使うため SNAPSHOT_ARN という環境変数に取り込みます。 export SNAPSHOT_ARN=$(aws rds create-db-cluster-snapshot --db-cluster-snapshot-identifier migrate-to-dsql --db-cluster-identifier prod-cluster --output text --query 'DBClusterSnapshot.DBClusterSnapshotArn') スナップショットのステータスは次のコマンドで確認します。 aws rds describe-db-cluster-snapshots \ --db-cluster-snapshot-identifier migrate-to-dsql --output text \ --query 'DBClusterSnapshots[*].Status' ステータスが「available」になるまで何度か実行し、その後で次のコマンドでスナップショットをエクスポートします。 aws rds start-export-task --export-task-identifier "$EXPORT_JOB_NAME" \ --source-arn $SNAPSHOT_ARN --s3-bucket-name "$BUCKET_NAME" \ --iam-role-arn "$EXPORT_ROLE_ARN" --kms-key-id "$KMS_KEY_ARN" エクスポートジョブのステータスが「COMPLETE」になるまで、次のコマンドを定期的に実行して状態を確認します。 aws rds describe-export-tasks --export-task-identifier "$EXPORT_JOB_NAME" --query 'ExportTasks[0].Status' --output text ここまでで、移行に必要なインフラの構築、ソースデータベースのスナップショット作成、Parquet ファイルとしての S3 バケットへのスナップショットエクスポートが完了しました。次に AWS Glue クローラーを実行してエクスポート済みデータをカタログ化し、PySpark ジョブを実行して Aurora DSQL にデータをロードします。 ソースデータベースからエクスポートした両方のテーブルに対して、次のコマンドで AWS Glue クローラーを実行します。クローラーは CloudFormation テンプレートで作成済みです。 aws glue start-crawler --name customers aws glue start-crawler --name orders クローラージョブの状態は次のコマンドで取得できます。クロール対象データの量によっては完了まで数分かかることがあります。両方のジョブが「COMPLETED」になるまで 1 分ごとに実行してください。 aws glue list-crawls --crawler-name customers \ --output text --query 'Crawls[0].State' aws glue list-crawls --crawler-name orders --output text --query 'Crawls[0].State' 両方のクローラージョブが完了したら、次のコマンドでクローラーがエクスポート済みスナップショットデータからカタログ化したテーブルとカラムを確認します。AWS Glue ローダージョブはこのカタログを使って S3 バケット内のエクスポート済みスナップショットデータを見つけます。 aws glue get-tables --database-name "$DATABASE_NAME" \ --query 'TableList[*].[Name,StorageDescriptor.Columns[*]]' 出力は次のようになります。 [ [ "sales_customers", [ { "Name": "id", "Type": "int" }, { "Name": "username", "Type": "string" }, { "Name": "first_name", "Type": "string" }, { "Name": "last_name", "Type": "string" } ] ], [ "sales_orders", [ { "Name": "id", "Type": "int" }, { "Name": "customer_id", "Type": "int" }, { "Name": "order_date", "Type": "string" }, { "Name": "order_timestamp", "Type": "string" }, { "Name": "product_details", "Type": "string" }, { "Name": "quantity", "Type": "int" }, { "Name": "unit_cost", "Type": "string" }, { "Name": "unit_weight", "Type": "float" } ] ] ] 次のコマンドで AWS Glue ジョブを実行します。 export JOB_RUN_ID=($(aws glue start-job-run --job-name "$GLUE_JOB_NAME" --output text --query 'JobRunId')) ジョブのステータスは次のコマンドで取得します。完了するまで定期的に実行してください。 aws glue get-job-run --job-name "$GLUE_JOB_NAME" --run-id $JOB_RUN_ID --output text --query 'JobRun.JobRunState' ジョブが完了すれば、データ移行は完了です。 検証 行数のカウントと数値カラムの合計を取り、移行が正しく完了したか検証します。 まずソースの Aurora PostgreSQL データベースとターゲットの Aurora DSQL クラスターの両方で次のクエリを実行します。 select count(*) from sales.customers; 両方の件数が一致するはずです。次のクエリもソースとターゲットの両方で実行し、件数と合計値がすべて一致することを確認します。 select count(*), sum(quantity), sum(unit_cost) from sales.orders; この単純な検証方法は本番環境の移行には十分ではありません。実際には、移行中に行った変換処理を考慮しつつ、すべてのカラム値を行ごとに比較する必要があります。 AWS Glue ジョブを理解する 本移行手法には多くの構成要素がありますが、唯一複雑なのはスクリプトを自分で書く必要のある AWS Glue ジョブです。本記事の AWS Glue ジョブはシンプルで、いくつかのデータ型変換と整数識別子から UUID への変更しか行っていませんが、より複雑な移行を構築する際の基本要素を示しています。本サンプル移行の AWS Glue ジョブのコードは CloudFormation スタックでデプロイ済みです。閲覧するには AWS Glue コンソールで ETL jobs を選択し、 storefront-snapshot-dsql-loader ジョブを選びます。スクリプトエディタにスクリプトが読み込まれます。コードを順に確認していきましょう。 ファイル冒頭はセットアップコードで、ライブラリのインポート、API コンポーネントの初期化、CloudFormation テンプレートで設定したジョブ設定パラメータの取得を行います。該当部分は次のとおりです。 import boto3 import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.sql import SparkSession from pyspark.sql import functions as func from pyspark.sql.functions import to_timestamp, to_date, col from awsglue.job import Job # Read job configuration parameters args = getResolvedOptions(sys.argv, ['JOB_NAME', 'dsql_endpoint', 'glue_database', 'schema']) # Job initializations dsql = boto3.client('dsql') sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) 続く数行のコードでは、AWS Glue カタログを使って S3 上のファイルを見つけ構造を解釈しつつ、customers と orders のデータを読み込みます。 # Read the exported snapshot tables from S3 customers_df = glueContext.create_dynamic_frame_from_catalog(database=args['glue_database'], table_name=args['schema'] + '_customers').toDF() orders_df = glueContext.create_dynamic_frame_from_catalog(database=args['glue_database'], table_name=args['schema'] + '_orders').toDF() 次にデータにいくつかの変換を適用します。データ型をいくつか変換し、テーブルの主キーを UUID に変換しつつ、元の ID は別カラム名で残します。まず customers テーブルから始めます。 # Perform transformations for the customers table customers_df.createOrReplaceTempView('customers') customers_adjusted = spark.sql('SELECT uuid() AS id, customers.id AS old_id, username, first_name, last_name FROM customers') customers_adjusted.createOrReplaceTempView('customers') ここでは id カラムを「old_id」にリネームします。続いて新しい「id 」 カラムを作成し、各行に新たに生成した UUID 値を設定します。これらの変換と後続ステップで残すカラムの選択は、使い慣れた SQL で行えます。 orders テーブルでも同様の処理を行いますが、加えて customer_id カラムを「old_customer_id」にリネームします。「old_customer_id」カラムは一時的なもので、後ほど UUID 主キーへの切り替えに伴う外部キー関係の修正で使います。 # Perform transformations for the orders table orders_df.createOrReplaceTempView('orders') orders_adjusted = spark.sql('SELECT uuid() AS id, orders.id AS old_id, customer_id as old_customer_id, order_date, order_timestamp, product_details, quantity, unit_cost, unit_weight FROM orders') 次に「order_timestamp」カラムと「order_date」カラムを修正し、Parquet 変換で文字列になっていた値を本来の timestamp 型と date 型に戻します。 to_timestamp() と to_date() を使っている点に注目してください。 orders_adjusted = orders_adjusted.withColumn('order_timestamp', to_timestamp(col('order_timestamp'))) orders_adjusted = orders_adjusted.withColumn('order_date', to_date(col('order_date'))) 続いて orders と customers の外部キー関係を新しい UUID 主キーを使うように修正し、Aurora DSQL のデータベースには残したくない「old_customer_id」カラムを削除します。Aurora DSQL は現状で外部キー制約を強制しませんが結合はサポートしているため、効率的な結合のために関係を修正することは重要です。 # Fix foreign key relationship between orders and customers orders_adjusted.createOrReplaceTempView('orders') orders_final = spark.sql('select o.id, c.id as customer_id, o.order_date, o.order_timestamp, o.product_details, o.quantity, o.unit_cost, o.unit_weight from orders o inner join customers c on o.old_customer_id = c.old_id') customers_adjusted = customers_adjusted.drop('old_id') データ変換の作業はこれで完了です。移行のアーキテクチャを示すために基本的な変換のみを紹介しましたが、PySpark は表現力が高く、移行で必要なあらゆる変換を実装できます。 データの準備ができたので、DSQL に書き込みます。次のコードがその処理です。 # Fetch token for authorization jdbc_url = 'jdbc:postgresql://' + args['dsql_endpoint'] + ':5432/postgres?sslmode=require' password = dsql.generate_db_connect_admin_auth_token(args['dsql_endpoint'], ExpiresIn=28800) # Write tables to DSQL customers_adjusted.write \ .format("jdbc") \ .mode('append') \ .option("url", jdbc_url) \ .option("dbtable", 'sales.customers') \ .option("user", "admin") \ .option("password", password) \ .option('sslmode', 'require') \ .option('isolationLevel', 'NONE') \ .option('batchsize', '2500') \ .option('stringtype', 'unspecified') \ .save() orders_final.write \ .format("jdbc") \ .mode('append') \ .option("url", jdbc_url) \ .option("dbtable", 'sales.orders') \ .option("user", "admin") \ .option("password", password) \ .option('sslmode', 'require') \ .option('isolationLevel', 'NONE') \ .option('batchsize', '2500') \ .option('stringtype', 'unspecified') \ .save() job.commit() まず、コードは IAM 認証 用のトークンを生成します。このトークンをデータベースのパスワードとして使います。移行ジョブの実行を通じてトークンが有効であり続けるよう、認証トークンに長めのタイムアウトを設定しています。 次に DataFrameWriter を使い、DataFrame の write() を呼び出して customers と orders の DataFrame を Aurora DSQL のそれぞれのテーブルに書き込みます。Spark が Aurora DSQL でテーブル作成を試みないよう、 DataFrameWriter の書き込みモードを「append」に設定します。Aurora DSQL は SSL 接続が必須のため、「sslmode」を「require」に設定します。 DataFrameWriter のデフォルト動作では、insert を複数バッチに分割しつつもすべてを 1 つのデータベーストランザクションで実行します。この挙動は Aurora DSQL の「1 トランザクションあたり 3,000 行まで」という変更行数の制限に抵触し、AWS Glue ジョブが失敗する原因になります。これを回避するため「isolationLevel」を「NONE」に設定し、各バッチの後に DataFrameWriter がコミットするよう強制します。バッチサイズが DSQL の制限を超えないよう「batchsize」を「2500」に設定しています。 クリーンアップ 移行を実行し、結果を検証し、AWS Glue スクリプトを理解できたところで、本記事で作成したリソースをクリーンアップします。 まず、次のコマンドで CloudFormation スタックを削除します。 aws cloudformation delete-stack --stack-name apg-to-dsql 次のコマンドを定期的に実行し、コマンドが「Stack with id apg-to-dsql does not exist」というエラーを返すまで状態を確認します。エラーが返ればスタックが削除されたことを意味します。 aws cloudformation describe-stacks --stack-name apg-to-dsql --output text --query 'Stacks[*].StackStatus' CloudFormation スタックが削除されたら、次のコマンドで Aurora DSQL クラスターを削除します。 aws dsql delete-cluster --identifier $DSQL_CLUSTER_ID 続いて、次のコマンドでデータベーススナップショットを削除します。 aws rds delete-db-cluster-snapshot --db-cluster-snapshot-identifier migrate-to-dsql 最後に、本例でソースデータベースとして使った Aurora PostgreSQL クラスターを削除します 。 まとめ 本記事では、AWS Glue を使って Aurora PostgreSQL のスナップショットから Aurora DSQL へデータを移行する方法を紹介しました。AWS Glue はマネージドな並列実行環境を提供し、大量のデータを Aurora DSQL に短時間で移行しつつ、表現力の高いプログラミング言語でデータ変換を行えます。プロセス全体は複雑に見えますが、難しいのは AWS Glue スクリプトを書く部分だけで、その難易度は移行の複雑さや PySpark の習熟度によって変わります。本記事では、ご自身の移行に合わせて修正できるベーススクリプトを提供しています。 ブラウザベースの プレイグラウンド を使えば、AWS アカウントがなくても今日から Aurora DSQL を評価できます。プレイグラウンドは一時的なデータベース環境で、Aurora DSQL を素早く試して数分でハンズオンを始められます。 著者について Dan Blaner Dan は、データベースを専門とする Principal Solutions Architect です。シナジーの活用、パラダイムシフト、枠にとらわれない発想を楽しんでいます。閉所恐怖症で箱には入れないので、まず枠の中で考えること自体が得意ではないかもしれません。なぜ箱に入れと言うのか、ですって? 性格的に疑り深いところもあります。 この記事は Kiro が翻訳を担当し、Solutions Architect の Koji Shinkubo がレビューしました。




















