はじめに 背景 Azure Data Factory利用時の注意点 Azure DatabricksとAzure Data Factoryの比較 Azure Databricksの利用方法 Azure Databricksの作成 クラスターの作成 PySparkの記述方法 storesコレクション ordersコレクション merged_storesコレクション 終わりに はじめに 電通 総研XI本部AIトランスフォーメーションセンターの岩本です。この記事では、Azure Cosmos DB for MongoDBのデータベース移行手段としてAzure DatabricksとAzure Data Factoryを取り上げ、それぞれの方法のメリット・デメリットを比較します。また、Azure Data Factoryを用いた具体的な実装例を紹介します。 背景 AIトランスフォーメーションセンターでは、Azure OpenAI Serviceを活用したAIチャット・RAGの製品であるKnow Narratorを開発しています。Know NarratorのバックエンドではスケーラブルなNoSQLデータベースであるAzure Cosmos DB for MongoDBを利用しています。理由としては以下の2点です。 Know Narratorのユーザー数は導入いただいているお客様によって様々であり、DBはスケーラブルである必要がある 頻繁なアップデートで後から機能を追加することを想定しているため、DB スキーマ は柔軟に変更可能である必要がある AIトランスフォーメーションセンターでは、今年3月にデータベース移行を伴うKnow Narratorの大型アップデートを顧客に提供しました。データベース移行手段として当初はAzure Data Factoryを検討していましたが、Data Factory上でのUUIDの取り扱いに制限があることが分かったため、最終的にはAzure Databricksを利用することに決定しました。 Azure Data Factory利用時の注意点 UUIDはMongoDB上では binary と type の2つのフィールドを持つオブジェクトとして表現されます。 type はUUIDの生成方法の違いを表しており、これが異なるとUUIDの一意性を維持できません。 CosmosDB for Mongo API に対しData Factoryを利用する際の注意として、元々DBに type=4 として保存している値が、Data Factory上にデータを読み出す際に type=3 として認識されてしまいます。Data Factory上で type を書き換えることはできますが、DBに書き出す際に再度 type=3 となってしまいます。 Microsoft サポートによると、これはData FactoryのCosmosDB for MongoDBコネクタの仕様であり、解消するには書き込み後にCosmosDB側で type を書き換えるコマンドを実行する必要があるとのことでした。 マイグレーション 作業手順が複雑になることを避けるため、今回はData Factoryを利用しませんでした。 Azure DatabricksとAzure Data Factoryの比較 以下のようにAzure DatabricksとAzure Data Factoryはそれぞれ異なる目的と機能を持っています。 Azure Databricks: 高度な分析、データサイエンス、 機械学習 のためのプラットフォーム。 Apache Sparkベースで、高速な ビッグデータ 処理が可能。 ノートブック環境を提供し、データ探索、視覚化、モデルのト レーニン グが容易。 Python 、 Scala 、 SQL 、Rなど多くの言語をサポート。 Azure Data Factory: データ統合とETL(Extract, Transform, Load)パイプラインの構築に特化。 データの移動、変換、 オーケストレーション を視覚的にデザイン可能。 さまざまなデータソース(オンプレミスや クラウド )と連携。 定義されたスケジュールやイベントに基づいたデータパイプラインの実行。 Databricksはデータ処理エンジンとして Apache Sparkを使用しており、Data Factoryに比べて非常に高速にデータ処理を行うことができます。また、料金体系に関してもDatabricksがランタイムの稼働時間のみに課金されるのに対し、Data Factoryはランタイムの稼働時間に加えてデータ量や計算量に応じた課金となるため、一度に大量のデータを処理する場合にはDatabricksが適していると言えます。 また、Databricksは Python や SQL で処理を記述できるため複雑なデータ操作に適しています。Data Factoryは GUI で直感的に処理フローを構成できますが、その分処理の自由度は落ちます。今回のDB マイグレーション では単純にデータを移行するだけでなく、あるコレクションのドキュメントを別のコレクションのドキュメントのフィールドの 入れ子 にするなどある程度複雑な操作が伴いました。このような操作はData Factoryでも実現不可能ではありませんが、複雑な操作は GUI ではかえって時間がかかってしまうためDatabricksを選択しました。 Azure Databricksの利用方法 Azure Databricksの作成 Azure Portal 上でリソースを作成します。 DatabricksからCosmosDBに接続するには、DatabricksのリソースをCosmosDBと同じVNetに配置する(VNetインジェクション)必要があります。 DatabricksのVNetインジェクションでは2つのサブネット(コンテナサブネットとホストサブネット)を利用するため、これらのサブネットを新たにCosmosDBが配置されているVNet上に作成する必要があります。サブネットの範囲はDatabricksで使用できる クラスタ ーノード数に影響するため、 /26 より小さいサブネットは推奨されません。 サブネット範囲と クラスタ ーノード数の具体的な関係については以下のページに記載されています。 learn.microsoft.com クラスタ ーの作成 ワークスペース を起動し、コンピューティング クラスタ ーを作成します。デフォルトの設定値でも特に問題はありませんが、ワーカータイプを安価なものに変更したり、開発環境であればスポット インスタンス を選択するなどしてコストを抑えることができます。 クラスタ ー作成後、ライブラリタブの[新規をインストール]をクリックし、以下の Maven 座標を使って Apache SparkとMongoDBのコネクタをインストールします。 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 ノートブックを作成し、[接続]をクリックして先ほど作成した クラスタ ーを選択します。 クラスタ ーの起動が完了していれば以下のように クラスタ ー名が表示され、 Python コードが実行可能な状態となります。  を結合し、ネストされたフィールドをもつコレクションmerged_storesを作成するというシナリオでサンプルコードを以下に示します。 storesコレクション _id store_id phone-number region 60c5ba5e4f1a2f6d2c9b7f92 {"type": 4, "data": "U3RvcmUx"} 123-456-7890 North 60c5ba5e4f1a2f6d2c9b7f93 {"type": 4, "data": "U3RvcmUy"} 987-654-3210 South ordersコレクション _id order_id price created_at store_id 70d5ba6e4f1a2f6d2c9b7f94 {"type": 4, "data": "T3JkZXIw"} 100 2021-06-13 12:00:00 {"type": 4, "data": "U3RvcmUx"} 70d5ba6e4f1a2f6d2c9b7f95 {"type": 4, "data": "T3JkZXIx"} 150 2021-06-14 13:00:00 {"type": 4, "data": "U3RvcmUx"} 70d5ba6e4f1a2f6d2c9b7f96 {"type": 4, "data": "T3JkZXIy"} 200 2021-06-15 14:00:00 {"type": 4, "data": "U3RvcmUy"} 70d5ba6e4f1a2f6d2c9b7f97 {"type": 4, "data": "T3JkZXIz"} 250 2021-06-16 15:00:00 {"type": 4, "data": "U3RvcmUy"} merged_storesコレクション _id store_id phone-number region orders 60c5ba5e4f1a2f6d2c9b7f92 {"type": 4, "data": "U3RvcmUx"} 123-456-7890 North { id: 70d5ba6e4f1a2f6d2c9b7f94, order_id: {"type": 4, "data": "T3JkZXIw"}, price: 100, created_at: 2021-06-13 12:00:00 }, { id: 70d5ba6e4f1a2f6d2c9b7f95, order_id: {"type": 4, "data": "T3JkZXIx"}, price: 150, created_at: 2021-06-14 13:00:00 } 60c5ba5e4f1a2f6d2c9b7f93 {"type": 4, "data": "U3RvcmUy"} 987-654-3210 South { id: 70d5ba6e4f1a2f6d2c9b7f96, order_id: {"type": 4, "data": "T3JkZXIy"}, price: 200, created_at: 2021-06-15 14:00:00 }, { id: 70d5ba6e4f1a2f6d2c9b7f97, order_id: {"type": 4, "data": "T3JkZXIz"}, price: 250, created_at: 2021-06-16 15:00:00 } # 必要なライブラリをインポート from pyspark.sql import SparkSession from pyspark.sql.functions import collect_list, struct from pyspark.sql.types import ( BinaryType, ByteType, IntegerType, StringType, StructField, StructType, TimestampType, ) # データベース接続に必要な情報を設定 src_connection_string = "<移行元DBの接続文字列>" dest_connection_string = "<移行先DBの接続文字列>" source_db = "<移行元DB名>" target_db = "<移行先DB名>" stores_collection = "stores" orders_collection = "orders" merged_stores_collection = "merged_stores" # SparkSessionの初期化 my_spark = SparkSession.builder.appName( "myApp" ).getOrCreate() # storesコレクションのスキーマを定義 stores_schema = StructType( [ StructField( "_id" , StructType([StructField( "oid" , StringType(), True )]), True ), StructField( "store_id" , StructType( [ StructField( "subType" , ByteType(), False ), StructField( "data" , BinaryType(), True ), ] ), True , ), StructField( "phone-number" , StringType(), True ), StructField( "region" , StringType(), True ), ] ) # storesコレクションからデータを読み込み df_stores = ( my_spark.read.schema(stores_schema) .format( "com.mongodb.spark.sql.DefaultSource" ) .option( "uri" , src_connection_string) .option( "database" , source_db) .option( "collection" , stores_collection) .load() ) # ordersコレクションのスキーマを定義 orders_schema = StructType( [ StructField( "_id" , StructType([StructField( "oid" , StringType(), True )]), True ), StructField( "order_id" , StructType( [ StructField( "subType" , ByteType(), False ), StructField( "data" , BinaryType(), True ), ] ), True , ), StructField( "price" , IntegerType(), True ), StructField( "created_at" , TimestampType(), True ), StructField( "store_id" , StructType( [ StructField( "subType" , ByteType(), False ), StructField( "data" , BinaryType(), True ), ] ), True , ), ] ) # ordersコレクションからデータを読み込み df_orders = ( my_spark.read.schema(orders_schema) .format( "com.mongodb.spark.sql.DefaultSource" ) .option( "uri" , src_connection_string) .option( "database" , source_db) .option( "collection" , orders_collection) .load() ) # 初期状態のstoresデータフレームの列名を保存 stores_new_columns = df_stores.columns # 結合時に列名の重複を避けるためstoresデータフレームの列名を変更 df_stores = df_stores.toDF( *[column_name + "_stores" for column_name in df_stores.columns] ) # storesデータフレームとordersデータフレームを結合 df_joined = df_stores.join( df_orders, df_stores.stores_id_stores == df_orders.stores_id, "left_outer" , ) # stores_idを除外したうえでordersをグループ化 orders_new_columns = df_orders.columns orders_new_columns.remove( "stores_id" ) df_merged_stores = df_joined.groupBy(df_stores.columns).agg( collect_list(struct(orders_new_columns)).alias( "orders" ) ) # 列名リストにordersを追加したうえで重複を避けるために変更した列名を元に戻す stores_new_columns.append( "orders" ) df_merged_stores = df_merged_stores.toDF(*stores_new_columns) # 結合されたデータを新しいコレクションに保存 df_merged_stores.write.format( "mongo" ).mode( "append" ).option( "uri" , dest_connection_string ).option( "maxBatchSize" , 1024 ).option( "database" , target_db).option( "collection" , merged_stores_collection ).save() 注意すべき点として、ネストされたフィールドをもつコレクションは読み込み時に スキーマ が自動で推論されません。 スキーマ を指定しないと、ネストされたフィールドの値は構造化されたデータではなくただのテキストとして扱われてしまいます。これを避けるため、サンプルコードではDBからのデータ読み込み時に スキーマ を指定するようにしています。 終わりに この記事ではAzure Cosmos DB for MongoDBのデータベース移行に関してAzure DatabricksとAzure Data Factoryを比較し、Azure Databricksを使った実装例を紹介しました。最後までご覧いただきありがとうございました。 私たちは一緒に働いてくれる仲間を募集しています! AI系プロジェクトマネージャー/リーダー(◎AIコンサル ◎AIプロジェクトマネージャー) AIサービス開発エンジニア 執筆: @iwamoto.yoshik85ca341e9dca4b22 、レビュー: @miyazawa.hibiki ( Shodo で執筆されました )