KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

StepFunctionsを使って大容量データを捌く🥳🎉🎉

すっかり秋めいてきましたね🍁
プラットフォームチームの石黒です。

我々プラットフォームチームでは、顧客向けのアカウント管理機能を提供しています。その中にはユーザーの新規登録機能もあるのですが、薬局の規模が大きくなるにつれ所属する薬剤師の数も増えるということで、ユーザー管理にかかるお客様の時間を少しでも減らすべく、CSVでユーザーを大量登録できる「ユーザー一括登録機能」を提供しています✨

CSVでのデータ取り込みは業務用システムでは馴染みのある機能かと思いますが、サーバーレスで実現しようとするとなかなか難しく、躓きポイントがたくさんありました。。
その中でも、大容量データを扱うためのアーキテクチャについて工夫したことと直面した問題をご紹介します!

実現したいこと

プロダクトマネージャーと相談して決めた要件は、大まかに以下の3つになりました。

① ユーザーがCSVファイルをアップロードできること
② 大容量データに対応すること
③ ユーザーが登録結果を確認できること

v2UploadUser

これらを実現するためには、各ユーザーの登録工程を管理しつつ、処理全体の進捗も把握する必要があります。SNSとSQS・EventBridgeなどを組み合わせたイベント駆動処理でも非同期かつスケーラブルな登録処理は実現できるものの、処理全体の進捗が把握しづらい😣という懸念があったため、今回はStepFunctionsを採用しました。

アーキテクチャ(当初案)

最初に考えたアーキテクチャは、CSVのパース・ユーザー登録・ステータスの更新を順に行っていくもので、シンプルなワークフローです。

工夫した点として、ユーザー登録Mapイテレータの中で別のStepFunctionsワークフローを呼び出しています。ユーザー登録処理は今回の一括登録以外の場面でも使われる機能であるため、別のワークフローに切り離して再利用できるようにすることが狙いです。
また、リクエストユーザーがワークフローの進捗を確認できるように、DynamoDBに処理ステータスを保持し、各タスクの完了に応じて更新しています。

ビジネスロジックを含んだ複雑な処理が求められる場合はLambda関数内で更新しますが、常に同じ値を更新する場面ではStepFunctions上で直接DynamoDBを更新する方が簡単なのでこれを利用しました。

初期アーキテクチャ

【問題①】子ワークフローに親の実装が依存してしまう👶

さて、ユーザー登録処理を担うワークフローを切り離し、親ワークフローから呼び出す仕様にしましたが、ここで問題が発生しました。。

全体の進捗把握とは別に個々の登録結果もDynamoDBに保持しようとすると、Mapイテレーター、つまりユーザー登録ワークフローの中で保存処理を用意する必要があります。そうなるとせっかく切り離したはずのワークフローに依存関係ができてしまう…という問題です。

問題点が分かりやすい絵

そこで、ユーザー登録ワークフローを呼び出し、結果をDynamoDBに保存するタスクを持つワークフローでさらにラップすることにしました。合計でワークフローが3重にラップされています。

このようにワークフローをネストすることで、ユーザー登録ワークフローの独立性を保ったまま利用することができます。
一方で、ネストすることで入出力JSONの階層が深くなるため、Path指定がより複雑になってしまうケースがありました。この問題は、入出力処理をシミュレートできるデータフローシミュレーターを使用して乗り切りました。

改善案1

【問題②】ペイロードサイズが足りない😇

今回の仕様ではCSVのパース後、DynamoDBに入力値(結果レコード)を保持することにしました。
これも問題①同様、子ワークフローには関心のないことなのでラップしたワークフローの1タスクとしてしまいたいところですが、何も考えずデータをロードし次の登録処理に受け渡そうとすると、入力値によってはペイロード上限を超えてしまう恐れがあります。
ペイロード上限を超えてしまった場合、StepFunctionsはその時点でエラー終了してしまいます。

StepFunctionsのタスク間のペイロード上限は2020年9月に256KBまで引き上げられたものの、やはり大きなサイズのデータ受け渡しには向きません。
AWS公式のベストプラクティスでは一時的にS3にデータを保存しARNを受け渡す方法が紹介されていますが、今回はDynamoDBに全レコードの情報を保持していることから別の方法を採用しました。

改善案2

改善したアーキテクチャでは、CSVのパース後、以下のようなページング情報をペイロードに含め、Mapに渡します。
各要素では、Mapイテレータで受け取ったページング情報をもとにDynamoDBからデータをロードし、安全なサイズでユーザーの登録処理が実行できるようになりました。

{
    "data": [
    {
      "eventId": "xxxx",
      "userId": "xxxx",
      "limit": 50,
      "offset": 0
    },
    {
      "eventId": "xxxx",
      "userId": "xxxx",
      "limit": 50,
      "offset": 50
    }
  ]
}

この方法による恩恵はペイロードサイズの問題だけでなく、並行でMapワークフローが実行されるため処理速度の短縮にも繋がります。
MapのMaxConcurrencyオプションで並列数の制御ができるため、登録処理に使用しているAWSサービスのクォータに影響が出ない程度に設定しました。

まとめ

StepFunctionsを用いた大容量データ処理のアーキテクチャを紹介しました✨

  • 共通ロジックは別ワークフローにして再利用可能にする
  • 独自のロジックは分離したワークフローをラップして新しいワークフローにする
  • 大容量データはページングしてデータロードを分散する

今回は紹介できませんでしたが、StepFunctionsのコードは全てCDKで定義しました🚀
その中で躓いたことや役立つTipsなど得られるものがたくさんあったので、別の機会にご紹介できたら嬉しいです!
皆さんも良いStepFunctionsライフをお送りくださいませ🤞