Amazon Web Services ブログ

数億オブジェクトを扱う AWS DataSync の実装

本記事は 2024 年 2 月 7 日に Neil Baillie (Principal Consultant) 、Sean O’Sullivan (Cloud Infrastructure Architect) によって執筆された内容を日本語化したものです。原文は こちら を参照してください。

ハイブリッドクラウド環境間で大量のデータを移行するのは、オンプレミスとクラウドの両方のネットワーク、ストレージ、コンピューティング、オペレーティングシステムレイヤーの技術的な制限の中で様々な要件が生じるため、大変な作業に見えるかもしれません。ユーザーは、コンプライアンス要件を満たすためにタスクスケジュールと転送時間を最適化しながら、復旧時間目標 (RTO) と復旧ポイント目標 (RPO) のバランスを取る必要があり、さらに難しい課題に直面します。これらの課題は、ハイブリッド環境間のレイテンシーやネットワーク帯域幅の制限などの技術的制約、移行が必要なデータセットの大きさや量など、様々な形で現れます。

2024 年 2 月 7 日、AWS DataSync は、マニフェストをサポートすることを発表しました。これは、DataSync タスクで転送するソースファイルまたはオブジェクトの明確なリストを指定できる新機能です。マニフェストを使用すると、タスクで処理する必要がある一部のファイルまたはオブジェクトのみを指定することで、タスクの実行時間を短縮できます。

本記事では、DataSync が Amazon Simple Storage Service (Amazon S3) にある 1 億を超えるソースオブジェクトをどのように扱うのか、どのようにこれらのオブジェクトを自己管理型ストレージにコピー、同期するのかについて説明します。データ転送時の長い待ち時間のようなスケーリングを阻害する問題を防ぎ、より効率的にデータを移動するためのお勧めの方法を紹介します。

DataSync の概要

DataSync は、AWS への移行をシンプル化し、AWS へのデータ転送を加速します。オンプレミスのストレージ、エッジロケーション、他のクラウド、AWS Storage 間のデータ転送を迅速かつセキュアに行えるよう支援します。

大量のデータ (例えば 1 億件以上の S3 オブジェクト) の転送に DataSync を利用するユーザーは、DataSync に備わっている固有の上限値の範囲内を意識しながら、データ転送タスクを最適化する方法について戦略的に検討する必要があります。

マニフェストファイルの概要

マニフェストは、DataSync で転送したいファイルまたはオブジェクトのリストです。例えば、S3 バケット内のすべてのオブジェクトをコピーする代わりに、DataSync はマニフェストに含まれるオブジェクトのみをコピーできます。この機能により、ユーザーは DataSync で転送するものをより正確に制御できます。大規模に特定のファイルセットをターゲットにする場合、インクルードフィルターの使用よりもマニフェストファイルの利用が好ましいでしょう。

関連するクォータの確認

AWS DataSync のクォータでは次のように記載されています。

Datasync quotas and limitations

タスクごとのファイルまたはオブジェクトの最大数は 5,000 万個です。目標が 1 億個を超えるオブジェクトを対象とする場合は、まずこの最初の制限を克服する必要があります。
上記のタスクフィルターに含めることができる最大文字数とタスク実行履歴が保持される日数のクォータも、ソリューションの作成を進めるにあたって関連します。

ソリューションの概要

このソリューションには次が含まれます。

  • データの再編成: ソースデータを分割し、DataSync のタスク実行あたりの最大オブジェクト数の制限内に収めます。
  • マニフェストファイルを使ったイベント駆動: 新規または変更されたデータのみを処理するタスクを構成します。
  • インクルードフィルターを使った大量一括処理: 自動でソースデータを DataSync タスクが実行できるように大きなバッチに分割します。

前提条件

このソリューションでは、下記が必要です。

  • AWS アカウント
  • DataSync のエージェント
  • 中級程度の Python プログラミングスキル

ソリューションの説明

次のセクションでは、さまざまなアプローチについて説明します。

データの再編成

DataSync タスクを設定する際は、タスクのクォータである 5000 万件未満にオブジェクト数を抑えるようタスクを定義するため、ソースデータの構造を整理してください。この再構成を行う際のオプションを、次の例で示します。

  • タイムスタンプに基づいて、オブジェクトをパスごとに別のバケットに分割。
  • オブジェクトの数が閾値に達したら、新しいパスまたはバケットを使用。

ただし、ソースデータに関連する特定のワークロードや他の外部要因によっては、これらが現実的でない場合があります。

マニフェストファイルによるイベント駆動

イベント駆動型のアプローチを利用すると、DataSync で転送されるべきファイルのみが特定のタスク実行に含まれるようになります。manifest-config を指定した start-task-execution を呼び出すことで、DataSync タスク実行に対象のファイル/オブジェクトのリストを含むことができます。アプリケーションプログラミングインタフェース (API) のコードの詳細は、DataSync ドキュメントで確認できます。

マニフェストファイルによって、スキャンと処理の対象がマニフェストファイルに指定されたファイル/オブジェクトだけに制限されます。これによって DataSync の全体の実行時間が短縮され、Amazon S3 のスキャン費用も最小限に抑えられます。

Utlilizing an Amazon EventBridge schedule rule to invoke Lambda

Amazon S3 に新しいオブジェクトが作成されると、s3:ObjectCreated イベントが Amazon EventBridge に配信されます。Amazon S3 にアップロードされる新しいオブジェクトごとに、EventBridge はそのオブジェクトを記載する JSON ペイロードを、Amazon Simple Queue Service (Amazon SQS) キューに送信します。 Amazon SQS をイベントの宛先とすることで、これらのイベントが処理のために持続的に取得されます。

AWS Lambda は、Amazon SQS との統合によりダイレクトで同期したイベントペイロードを処理するために必要な計算リソースを提供します。この統合により、バッチウィンドウを利用することで、最大 5 分間レコードをバッファリングすることが可能になります。

ソースバケットへのオブジェクトのアップロード頻度によっては、5 分より長い時間バッファリングできます。より多くのオブジェクトを渡すことで、DataSync タスクの呼び出し回数が減る利点があります。ただし、転送に遅延が生じます。より大きなデータセットを転送する影響と、RPO への影響のどちらを重視するかを評価する必要があります。

5 分より長いバッファ時間を実装するには、スケジュールに従って実行する Amazon EventBridge ルールの作成を使用することができます。

A basic event-driven approach

Amazon SQS からのメッセージの取得

Amazon SQS キューからメッセージを読み取るために、Lambda 関数は boto3 Amazon SQS クライアントライブラリの get_queue_attributes と receive_messages API コールを使用します。

Lambda 関数は利用可能なメッセージ数を理解するために、キューの長さを取得します。この値は実行中のキューの残っているアイテム数と比較することができます。これにより、すべてのメッセージが取得されることが保証されます。あるいは、単一のタスク実行でオブジェクトの数を制限したい場合もあります。これは、キューにメッセージを残すことで実現できます (RPO に影響を与える可能性があります) 。または、複数のタスクを生成することで実現できます (DataSync でキューされた後に、全体のジョブの一部として単一のタスクを停止または再開する必要がある場合、運用上使いやすいかもしれません) 。

import boto3
import os

sqs_client = boto3.client('sqs')
queue_url = os.environ['QUEUE_URL']

# Get the SQS queue length
resp = sqs_client.get_queue_attributes(
    QueueUrl=queue_url,
    AttributeNames=[
        'ApproximateNumberOfMessages',
        'ApproximateNumberOfMessagesNotVisible'
    ]
)
approximate_queue_length = resp['Attributes']['ApproximateNumberOfMessages']

Amazon SQS のキューにあるメッセージ数を把握した後、最適なパフォーマンスを得るために各 DataSync タスクに送信するメッセージの総数を検討します。単一の DataSync タスクにメッセージを過剰にロードすると、遅延が発生する可能性があります。これは、DataSync がタスクを順次実行し、アクティブな DataSync タスクが完了するまでタスクがキューイングされるため、Amazon SQS キューに到着する新しいメッセージの全体的な処理時間に影響します。

この問題を緩和するために、DataSync タスクのオーバーヘッドを減らすように Amazon SQS メッセージをバッチ処理します。 例えば、5 分間で 1,000 メッセージをバッチ処理すると、1 つの DataSync タスクで毎秒 3.3 個の新規オブジェクトを処理できます。一度にバッチ処理するメッセージ数を選択するだけでなく、必要な RPO を確実に満たすために、5 分ごとなど、設定した間隔で Amazon SQS キューを空にする必要があります。

Amazon SQS のメッセージを取得するタイミングでは、receive_message API を使うと、1 回の API コールで Amazon SQS キューから最大 10 件のメッセージを受信できます。次のコード例では、メッセージ数が 1000 件 または指定した件数になるまで、while ループを使ってキューからメッセージを取得しています。

ボリュームの大きさを考えると、この関数ではパフォーマンスを最適化するために yield 文を使用します。これにより、キューから取得した値のリストはメモリに格納されません。


       QUEUE_URL = os.environ[“queue_url”] 
       MAX_RECEIVE = 20000

       def process_chunks(queue_length: int):

    processed_msg_count = 0
    max_receive = MAX_RECEIVE
    receive_count = 0

    # receive from SQS queue & build list of messages
    while receive_count < (max_receive / 10) and processed_msg_count < queue_length:

        receive_count += 1

        try:
            resp = sqs_client.receive_message(
                QueueUrl=QUEUE_URL, AttributeNames=["All"], MaxNumberOfMessages=10
            )
        except Exception as e:
            logging.error("SQS Retrieve Error")
            logging.error(traceback.format_exc())

        try:
            msgs = []
            for message in resp["Messages"]:
                msgs.append(message)
                processed_msg_count += 1
            yield msgs  # Messages to be sent to DataSync
        except KeyError:
            return

マニフェストの構築

Amazon SQS キューからメッセージを取得した後、DataSync にメッセージを渡す前にメッセージに対していくつかのアクションを実行する必要があります。

マニフェストファイルを構築するための手順:

  1. 各 Amazon SQS メッセージから、S3 オブジェクトの「Key」の値を抽出。
  2. Amazon S3 にマニフェストファイルを書き込み。
  3. 重複がないことを確認。
  4. フォルダやプレフィックスは含まれないことを確認。(マニフェストにはオブジェクトのみを含めることができる点に注意)

Amazon S3 にマニフェストファイルを書き込む際は、次のルールに従う必要があります:

  1. オブジェクトのフルパスを指定する必要があります。フォルダ/パス/ディレクトリだけを指定することはできません。オブジェクトの特定のバージョン (カンマ区切り) を転送するように指定できます。
  2. 各オブジェクトは改行文字で区切ってください。
  3. マニフェストファイルは csv ファイル (ファイル名.csv) として保存してください。

必要な IAM 権限:

  • Lambda: Lambda 関数の実行ロールには、マニフェストファイルを含むパス (バケット/フォルダ) への s3:PutObject 権限が含まれている必要があります。
  • DataSync: DataSync タスクの AWS Identity and Access Management (IAM) ロールには、マニフェストファイルが保存されているパス (バケット/フォルダ) への s3:GetObject 権限と、オブジェクトバージョンを指定する場合は s3:GetObjectVersion 権限が含まれている必要があります。

他の検討事項としては、マニフェストファイル名があります。同じタスクを再実行したい場合、タスクの実行中や実行後にマニフェストファイルを削除または変更してはいけません。1 つの戦略として、Lambda 呼び出しから実行 ID を取得してファイル名に使うことが考えられます。これにより、名前の重複を防ぎ、実行 API にファイル名を渡すことができます。この手法では、Lambda が継続してファイルを生成するため、時々マニフェストバケットをクリーンアップする必要があります。

Amazon SQS メッセージごとに、S3 オブジェクトの「Key」値を抽出:

  • Amazon SQS キューからのメッセージリストを、別の関数に読み込んで、各メッセージから S3 オブジェクトの Key 値を抽出できるようになりました。この関数は、このリストの一部であるディレクトリも処理し、マニフェストファイルから除外します。
def process_chunk_data(messages: list):

    manifest_data = []
    processed_msgs = []

    for msg in messages:
        msg_body = msg["Body"]
        payload = json.loads(msg_body)
        msg_key = payload["detail"]["object"]["key"]

        if msg_key[-1] != "/":  # Prevents Dirs from manifest
            manifest_data.append(msg_key)
        else:
            logger.info(f"Directory found. Skipping from manifest: {msg_key}")

        processed_msgs.extend(
            [{"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}]
        )

    return (manifest_data, processed_msgs)

重複する要素がなく、マニフェストファイルに保存される一意な要素のリストを確実に持つために、オブジェクトキーのリストを含むリスト変数 (manifest_data など) に対して Python の set メソッドを使用します。

マニフェストファイルを Amazon S3 に書き込む:

  • 転送対象のオブジェクトをマニフェストファイルに読み込んだ後、そのファイルをマニフェストバケットにアップロードする必要があります。

ファイル名を一意にするため、この例では、前述のとおり Lambda の呼び出しから context.aws_request_id を使用しています。

def publish_manifest(manifest_data: set, aws_request_id: str, idx: int):

    filename = f"{S3_MANIFEST_FOLDER}{aws_request_id}_{idx}.csv"

    result = s3_resource.Object(S3_BUCKET_NAME, filename).put(
        Body=','.join(manifest_data)[1:]
    )

    if result.get("ResponseMetadata").get("HTTPStatusCode") == 200:
        logger.info(f"Manifest Upload: Successful: Size: {len(manifest_data)} File: {S3_BUCKET_NAME} {filename}")

    else:
        sys.exit(1)

    return filename

DataSync 呼び出し

マニフェストファイルが作成され Amazon S3 にアップロードされると、DataSync タスクが実行されます。Lambda 関数は、マニフェストファイルのパスを参照して StartTaskExecution API を呼び出します。

    ds_client = boto3.client('datasync')
    DS_TASK_ARN = os.environ['ds_task_arn']

    ds_client.start_task_execution(
            TaskArn=DS_TASK_ARN,
          OverrideOptions={
              "VerifyMode": "ONLY_FILES_TRANSFERRED",
               "TransferMode": "ALL",
             },
           ManifestConfig={
           "Action": "TRANSFER",
            "Format": "CSV",
            "Source": {
                 "S3": {
                   "ManifestObjectPath": manifest_file,
                   "S3BucketArn": S3_BUCKET_MANIFEST_ARN,
                   "BucketAccessRoleArn": DS_S3_ROLE_ARN
                }
            }
            }
    )

Amazon SQS キューから正常に転送されたメッセージの削除

DataSync タスクが正常に実行された後、Amazon SQS キューのメッセージを安全に削除できます。前の process_chunk_data 関数では、タスクが正常に実行された場合に削除するメッセージを processed_msgs 変数に格納しています。この値は、次の delete_messages 関数に渡されます。

QUEUE_URL = os.environ["queue_url"]
def delete_messages(processed_msgs: list):

    total_msgs = len(processed_msgs)
    processed_msg_buffer = processed_msgs
    deleted_msg_count = 0

    while len(processed_msg_buffer) > 0:

        delete_msg_list = processed_msg_buffer[0:10]
        processed_msg_buffer = processed_msg_buffer[10:]

        try:
            resp = sqs_client.delete_message_batch(
                QueueUrl=QUEUE_URL, Entries=delete_msg_list
            )
            deleted_msg_count += len(resp["Successful"])
        except Exception as e:
            logging.error(traceback.format_exc())

    if deleted_msg_count != total_msgs:
        raise RuntimeError(
            f"Failed to delete messages: total messages={total_msgs!r} resp={resp!r}"
        )
    else:
        logger.info(f"Deleted {deleted_msg_count} from the queue")
    return total_msgs

インクルードフィルターを利用した大容量バッチ

最初に大量のデータをシードする必要がある場合、大容量バッチを移動するには DataSync の使用が適しています。オフライン転送の代替手段として、そのタスクを実行頻度や期限によっては AWS Snowball を使用する方法があります。また、イベント駆動型のアプローチと組み合わせて、データセット全体の検証を不定期に実行する場合も DataSync が適しています。

DataSync では、StartTaskExecutionVerifyMode オプションを POINT_IN_TIME_CONSISTENT に設定して呼び出すことで、データセット全体をチェックおよび検証できます。この場合、目的はデータセット全体を転送して検証することであるため、マニフェストを使用する必要はありません。VerifyMode を他のユースケースのマニフェストとともに使用する場合、オペレーションごとに特定し、転送するファイルのみをターゲットにするために ONLY_FILES_TRANSFERRED を指定する必要があります。

Using DataSync include filter with an EventBridge schedule rule

オブジェクトのクォータ

バッチアプローチを使用する場合、上限のクォータがどこにあるかを理解するために、さらにいくつかの理論上の上限について説明する必要があります。

  • タスク実行ごとの最大ファイル数またはオブジェクト数は、5000 万件です。
  • 単一のタスクにキューイングできるタスク実行の最大数は、50 件です。
  • 最大数のタスクがキューイングされた場合のタスクごとの最大オブジェクト数は、25 億件です。

まずタスクがニーズに十分な規模かどうかを評価します。十分でない場合は、複数のターゲットをセットした複数のタスクを定義することを検討してください。

フィルターのクォータ

タスクにオブジェクトを含めるには、インクルードフィルターを使用します。フィルターの最大長は 102,400 文字です。このクォータに達するタイミングと方法は、オブジェクトキーで使用されるパス構造によって異なります。パスプレフィックスが長いほど、フィルターの連結が最大長に達するまでの時間が短くなります。

フィルター文字列の作成

フィルターを作成する際は、タスク呼び出しを効率的に使用するために、タスクの最大値を超えないようにオブジェクトを含めてください。

フィルターオブジェクトは次のようになります:

includes = [{
    "FilterType": "SIMPLE_PATTERN",
    "Value": str(datasync_filter)
}]

UUID ベースのバッチ処理

フィルターの長さは、キープレフィックスが UUID (Universally Unique Identifier) を使用している場合、予測可能です。UUID は、各プレフィックスが一意になるように 16 進数 (Hex) で構築されます。詳細は次の表をご覧ください。

Batching UUID digits and prefixes

フィルターに含まれる桁数が増えると、フィルターに含まれるオブジェクトの数は減少します。桁数が多すぎると、必要なフィルターのプレフィックスの数が増え、全体のフィルター長が長すぎることがあります。含まれる桁数が少なすぎると、オブジェクトが多すぎることがあります。プレフィックスフィルターに含める最適な桁数を見つけるには、調査が必要となります。

例えば、オブジェクトキーでパスプレフィックスが 21 文字あり、1 桁のフィルターが 1 つある場合、21 + 1 = 22 文字になります。

1 桁の場合、16 個のプレフィックスグループが作成されます。区切り文字に | を使用するため、この場合のフィルター文字列の長さは (22 * 16)+ 15 = 367 文字になります。

3 桁の場合、フィルターは 21 + 3 = 24 文字で、プレフィックスグループは 4096 個になります。フィルター文字列の長さは (24 * 4096)+ 4095 = 102,399 文字となり、これは最大長以内です。桁数を増やすと文字列が長くなりすぎて最大長を超えてしまいます。

これらの例はすべて、UUID のサンプルセットが十分に大きく、エントロピーも十分にあり、オブジェクトが 1 桁のマッチプレフィックスに対して均等に分散していることを前提としています。

大容量バッチ UUID のコード例

次のサンプルコードは、前述の最適な処理を実装する方法を示しています。

Python を使用して UUID プレフィックスのフルセットを生成します:

datasync_filter = ''

number_of_digits = 3
prefix = 'path/to/objects/'

hex_range = range(0, 16**number_of_digits)
uuid_prefixes = [f"{i:x}".zfill(number_of_digits) for i in hex_range]
# These can then be joined with the path prefix to create filter.
for uuid_prefix in uuid_prefixes:
    # Append the Prefix to the Filter, adding delimiter for all but the first
    if datasync_filter:
        datasync_filter += '|'
    datasync_filter += f'{prefix}{uuid_prefix}*'

datasync_filterインクルードフィルターで使用できます。

クリーンアップ

使用されていないリソースは削除し、課金されないようにしてください。削除対象には Lambda 関数、Amazon SQS キュー、Amazon S3 内の使用していないサンプルデータが含まれます。

まとめ

本記事では、AWS DataSync を使って最大 1 億を超えるオブジェクトからなる大規模なデータセットをバッチ処理、処理、転送、検証する方法について説明しました。マニフェストを紹介し、この DataSync の新機能をイベント駆動型アーキテクチャの一部として使用して、タスクで処理が必要なファイルまたはオブジェクトのみを指定することで、タスク実行時間を短縮できることを確認しました。加えて、初期の大量データ転送が必要な顧客向けに、DataSync の インクルードフィルターを使う方法を実演しました。また、AWS DataSync サービスのクォータの一部を概説し、これらのしきい値を回避するためのデータ転送効率化方法を提案しました。

AWS DataSync とマニフェストの詳細については、以下のリンクを参照してください:

Neil Baillie

Neil Baillie

Neil Baillie は AWS ProServe の Principal Consultant であり、英国中央政府チームの一員です。Neil は 20 年以上の業界経験があり、クラウドアーキテクチャとエンジニアリングのバックグラウンドを持っています。 彼は、お客様によるプロジェクトの実施と AWS の可能性の実現を支援することに重点を置いています。

Sean O’Sullivan

Sean O’Sullivan

Sean O’Sullivan は AWS ProServe のクラウドインフラストラクチャアーキテクトであり、EMEA プロフェッショナルサービスチームの一員です。業界で 7 年以上にわたり、お客様と協力してデジタルトランスフォーメーションプロジェクトを推進し、AWS クラウドを最大限に活用するためのソリューションの設計、自動化、再設計を支援してきました。