KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

イベント駆動処理をメンテナンスモードにするためにやったこと

はじめに

こんにちは。AI在庫管理チームソフトウェアエンジニアの坂本です。
今回はこちらの記事で松本さんが紹介していたメンテナンスモードの中で、イベント駆動処理のメンテナンスモードを開発するためにやったことを少し詳しく紹介できればと思います。

松本さんの記事のアーキテクチャ図を拝借すると、今回の記事は以下の赤枠の話が中心になります。

AI 在庫管理のメンテナンスモードとは

AI在庫管理のメンテナンスモードの主な目的は夜間のDBへのアクセスを止めて、長時間のDBのマイグレーションや再起動操作を実行することです。そのため、こちらで実行タイミングを制御することができないイベント駆動処理はメンテナンスモードでDBのアクセスを停止する必要があります。
(メンテナンスモードの全体的な目的や背景については先述の記事に詳しく記載されているため、本記事では割愛します。)

イベント駆動処理で行っていること

今回メンテナンスモードを開発する必要のあったイベント駆動処理は処方箋情報の取り込みです。

詳細については省きますが、S3に処方箋情報ファイルが連携されると、S3イベントで処方箋取り込みを行うLambda関数が起動するようになっています。
このLambda関数の内部では主に処方箋データの作成、処方された医薬品の在庫減算を行っています。

メンテナンスモードにおいて必要な機能

メンテナンスモードで必要な機能は大きく2つあります。

① メンテナンスモード中のイベントの保存

メンテナンスモード中に発生するイベントを過不足なく保存しておく必要があります。
この機能が正常に機能しない場合、顧客から連携されたデータによるイベントが正しく保存されません。その場合、不足したイベント分の処方箋取り込みに不整合が発生してしまいます。

② メンテナンスモード明けのリカバリ機能

メンテナンスモード明けに格納しておいたイベントからLambdaを実行し、リカバリをする必要があります。
この機能が正常に機能しない場合、①と同様に処方箋取り込みに不整合が発生してしまいます。

メンテナンスモードに使用した技術

主に以下のような技術を使用しています。

  • AWS AppConfig
  • Amazon SQS
  • AWS Step Functions

AppConfig

メンテナンスモードであるかを判定するフラグ管理ツールとして使用しています。
AppConfigではJSONデータでフラグ管理をしていて、is_in_maintenance がtrueの場合はメンテナンスモード、falseの場合はメンテナンスモードではないと判定するようにしています。

{"is_in_maintenance": true}

SQS

イベント起動を保存するために使用しています。
今回はリカバリの順序性が担保されなくても整合性が取れる処理だったため、FIFOキューではなく、標準キューを使用しています。 以下のようなJSONでリカバリするために必要な情報をSQSに保存するようにしています。

{
  "data_from": "複数の処方箋データ連携経路がありその経路を示す情報。",
  "s3_key": "配置されたデータの S3 Object Key",
  "receipt_handle": "SQS メッセージの receipt_handle、リカバリ後のメッセージの削除に必要"
}

AppConfigをチェックし、メンテナンスモードである場合はSQSに情報を保存します。

Step Functions

Step Functionsはメンテナンス明けのリカバリ時に可能な限り自動的に復旧処理を行うためのワークフローツールとして使用しています。

ワークフローツールとしてStep Functionsを採用した理由は、

  • 今回使用するAWSリソース(Lambda, SQS)とのインテグレーションが十分であること
  • チームとして利用実績が豊富であり知見があること
  • チームで利用しているServerless Frameworkでデプロイしやすいこと

などが挙げられます。

メンテナンスモードの実装

メンテナンスモード時

メンテナンスモード時は、Lambdaが実行された時の最初の処理でAppConfigにアクセスし、メンテナンスモードである場合の処理としてSQSにリカバリ処理に必要な情報をキューイングするようになっています。

メンテナンス明けのリカバリ

メンテナンス明けのリカバリではStateMachineを実行します。
StateMachine内では、メンテナンスモード時に格納されたメッセージを取り出すためのLambdaを実行します。このLambdaを実行するとSQSのメッセージを配列にしてStateMachineに返却します。
(receiveMessage自体はStateMachineから直接SQSのAPIを実行することも可能ですが、あえてLambda関数を実装した理由については後述します。)

その後、取得した配列をもとに順次処方箋取り込みのリカバリ処理を実施します。
この処理はStateMachineの Distributed Map を用いて並列でリカバリ処理を実行しています。処方箋取り込みが完了したらメッセージに格納されているReceiptHandleを用いてSQSからメッセージを削除します。

並列処理がすべて完了したらSQSにメッセージが残っていないか確認します。メッセージが残っている場合はSQSのメッセージを取得する処理から実行し直すループになるようにしています。

メッセージの取得に SQS receiveMessage API ではなく、Lambda を新規実装した理由

receiveMessage APIでは最大10件しかメッセージを取得できません。
リカバリを迅速に実行し、できるだけ早くメンテナンス状態から復帰したいというモチベーションがあるので、Lambdaから多くのメッセージを一度に返却し、並列処理を効率的に実行できるようにしました。

しかし、ここでもう1つ問題が起きました。それはStateMachineのペイロードサイズ制限の問題です。
StateMachineではペイロードで扱えるサイズが256KBまでと決まっています(参考)。つまり、今回のStateMachineの各ステップでのInput/Outputが256KB以下になっている必要があります。
そのため、SQSに256KB以上のデータサイズの情報が格納されている場合、メッセージをすべて取得するとStateMachineが異常終了してしまいます。
この問題に対応するため、今回は256KBを越えないメッセージ数に限定し、メッセージが残っている場合はStateMachineを先頭の処理からループさせる構成にしました。

おわりに

イベント駆動処理をメンテナンスモードにするための技術や実装を紹介させていただきました。
安全に使用できるメンテナンスモードをスコープを絞りながら短期間で実現できました。
S3 EventとLambda実行の間にキューを挟むべきでは?、イベントの保存にSQSは本当に適切だったのだろうか?と主にユーザビリティの観点でいくつか改善の余地がありそうだなと思っていますが、このあたりの改善課題は他開発タスクとの優先度を整理しつつ、より良いものにしていければと思っています。