KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

【Airflow】夜間バッチが心配で夜も眠れない...なら夕方にも動かせるバッチ処理のE2Eテスト環境を作ってしまおう!

こんにちは、カケハシでMusubi Insightのバックエンドエンジニアをしている末松です。

夜間などに動くバッチ処理を開発・メンテしているエンジニアの方ならきっと共感していただけると思うのですが、夜間バッチって心配になりますよね。

朝早起きして結果を確認したり、夜更かししてリアルタイムで張り込みをしたり...

数名いる Musubi Insight のバックエンドエンジニア陣も同様で、バッチ処理に何かしらの変更を加えた夜は結果が気になってしまいあまり熟睡できなかった、ということもしばしば。

この記事では、そんなエンジニア陣の救世主となったバッチ処理のE2Eテスト(※)環境を作ったお話になります。

※ エンドツーエンドテスト。本来はアプリケーションのシステム全体を通してのテストのことを指すが、ここではバッチ処理全体を通したテストのことを指す。

Musubi Insightのバッチ処理について

Musubi Insight は薬局の経営改善に役立つ様々なデータを可視化しているプロダクトで、そのデータは主に日次の夜間バッチで生成されています。

夜間バッチに使っている技術や仕組み等は、以下の記事で紹介させていただいています。

今まで発生したエラーの数々

正式リリースから早2年以上が経過していますが、これまで様々な原因で夜間バッチにエラーが発生し Musubi Insight のデータ欠損が発生、ユーザーの方々にご迷惑をおかけすることがありました。

その原因は本当に様々で、全てを完璧に予測することは困難を極めました。

  1. PySpark スクリプトにバグが混入
  2. Airflow における依存関係の設定ミス(必要なデータが揃っていない状態で処理が動いてしまう)
  3. AWS リソース等の人的な操作ミス
  4. データ量増加に伴う、メモリ・ディスク容量不足やタイムアウト
  5. 意図しない例外データが含まれていた

これまでのエラー防止・検知の取り組み

カケハシでは障害発生後にふりかえりを行う文化があり、関係者が集まって恒久対応案や再発防止策、改善アイデアを出し合います。

Musubi Insight 開発チームもバッチ処理エラーの度にふりかえりを行い、そこで出た夜間バッチのエラー防止・検知のためのアイデアを実行に移してきています。

  • PagerDuty を用いたアラート体制の構築
  • Datadog を用いた AWS リソースのモニタリング体制の構築
  • AWS Glue リソースを github 上でコード管理をするようにすることで、必ずレビューが通るような仕組みづくり

バッチ処理のE2Eテスト環境を作るに至った理由

通常業務の隙間を縫って改善を進めてきたこともあり、未だ実現できていないエラー防止のアイデアが溜まっている状態でした。

  1. 夜間だけでなく夕方にもバッチ処理を回すことで、事前にエラーを検知する(以降、夕方バッチ)
  2. PySparkのユニットテスト環境の構築
  3. バッチ処理の develop や staging 環境の構築

次に着手すべき改善を検討した結果、以下の理由から 1 の「夕方バッチ」の実現に向けて動き出すことに。

  • 実現するための工数が他のアイデアと比べて少ない
  • 近々バッチ処理全体に関わる修正が入ることがわかっており、その動作検証に活用できる

通常業務と並走しながら約3週間かけて実装・検証を行い、実現に至りました。

夕方バッチとは

チームでは「夕方バッチ」と呼んでいますが、通常夜間にのみ動いているバッチ処理を夕方にも1度動かすことで事前にエラーを検知する仕組みになります。

Musubi Insight のバッチ処理の流れを図にすると以下のようになりますが、

バッチ処理の流れ

この「Glue job でのデータ集計」の処理のみを夕方に実行する仕組みになります。

夕方バッチの制約と仕様

ただし夕方バッチにはいくつかの制約があり、

  • 本番稼働しているプロダクトに影響を与えないこと
  • バッチ処理を動かすため、コスト面から毎日実行することは避けたい

これらの制約を満たせるように、以下の仕様となりました。

  • Airflow のフローにおいて、Musubi Insight の データベース(RDS)へのデータロードはスキップさせる
  • バッチ処理に変更が入った時のみ手動実行可能なオンデマンド形式にする

夕方バッチで検知したいエラー

下記のエラー(今まで発生したエラーの数々 のチャプターより再掲)

  1. PySpark スクリプトにバグが混入
  2. Airflow における依存関係の設定ミス(必要なデータが揃っていない状態で処理が動いてしまう)
  3. AWS リソース等の人的な操作ミス
  4. データ量増加に伴う、メモリ・ディスク容量不足やタイムアウト
  5. 意図しない例外データが含まれていた

のうち、1~3を夕方バッチを用いることで検知・防止ができると考えています。

夕方バッチの実装方法

ここからは、Airflow で管理されている Musubi Insight のデータ集計フローをどのように夕方バッチに適用させたかについての話になります。

まず前提として Musubi Insight のデータ集計フローは下記のように2つの DAG に別れており、

MIのデータ集計フロー

TriggerDagRunOperatorによって export_dag が完了したら mi_dag が動き出すようになっています。

夕方バッチ用のDAGを作成

夕方バッチ用の DAG mi_test.py を作成し、この DAG をオンデマンド実行することで mi_dag が実行されるようにします。

DAGを定義

from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.models.taskinstance import Context


def on_failure_callback(context: Context):
    # slack通知

# 全てのtaskに共通の設定
args = {
    "provide_context": True,  # DAG や DagRun の各種パラメータを持つ context を Task に渡す
    "depends_on_past": False,  # 指定したタスクの上流タスクの実行が失敗した場合タスクを実行するか
    "on_failure_callback": on_failure_callback,
    "retries": 0,
    "execution_timeout": timedelta(minutes=180),
}
with DAG(
    dag_id="mi_dag_test",
    default_args=args,
    schedule_interval=None,  # オンデマンド実行
    start_date=days_ago(1),
    catchup=False,  # backfillしない
) as dag:
    # 省略

■ コード解説
default_argsには、その DAG の中の Task に共通で適用される設定を渡しています。

mi_dag のテスト実行

次に作成した mi_test.py のDAGの中で、テスト実行をさせたいDAG mi_dag を呼び出す Task を定義します。

from airflow.operators.dagrun_operator import TriggerDagRunOperator

# 省略

    trigger_mi_dag = TriggerDagRunOperator(
        task_id=f"trigger_mi_dag",
        trigger_dag_id="mi_dag",
        conf={"is_test": "1"},  # ※
        wait_for_completion=True,
        failed_states=["failed"],
    )

is_test は後続の処理の関係上文字列として扱われてしまうため、最初から文字列にしています。

■ コード解説
夕方バッチの実行かどうかを判別するため、DAG の実行パラメータ confis_test を渡しています。
conf にパラメータを設定することで、実行した DAG( DagRun )にそのパラメータが渡されます。

コード解説1

mi_dag で本番データベースへのロードをスキップさせる

続いて mi_dag の方も修正を行い、渡された is_test の値によってデータベースへのロードを行うかどうかを判断させます。

整理するとこのようになります。

夕方バッチの実行対象

データベースへのロード用の Task にパラメータを追加

is_test は DAG の実行(DagRun)ごとに内容が異なるため、Task の内部で評価・判定をする必要があります。

Airflow における Task は、PythonOperatorTriggerDagRunOperatorBashOperator と言った Operator で定義されます。

例えば PythonOperator の場合、python_callable に設定された関数が Task 内部で実際に実行されることになります。

そのため下のコードで定義している関数では is_test の評価・判定は行っていません。( is_testcontext によって Task に渡されます)

from airflow.operators.python_operator import PythonOperator

def loadjob(task_id: str, job_name: str, job_args: dict):
    return PythonOperator(
        task_id=task_id,
        python_callable=start_gluejob,
        op_kwargs={
            "job_name": job_name,
            "job_args": job_args,
            "is_skip_test": True,  # 夕方バッチではスキップする
        },
    )

args = {
    "provide_context": True,  # 有効化する
    ...
}
with DAG(
    dag_id="mi_dag",
    default_args=args,
    ...
) as dag:
    ...

■ コード解説
loadjob の関数は、本番DBへのロード処理の Task を生成する Factory になっています。
夕方バッチではスキップしたい処理であるため、 python_callable の関数に渡されるパラメータ op_kwargsis_skip_test を追加しています。
また Task の共通設定で provide_context を有効化していることで、is_testDagRun に渡されたパラメータも Task に渡るようになり、Task 側では op_kwargscontext が合わさったパラメータを受け取ることになります。

コード解説2

is_test を評価して gluejob を実行するか判定する

python_callable の関数側に、is_test の評価・判定を追加していきます。

from airflow.exceptions import AirflowSkipException


def start_gluejob(job_name, is_skip_test=False, **kwargs):
    """
    PythonOperator の python_callable にこの関数が指定された場合、
    kwargs の中に dag や dagrun の各種パラメータを持つ context の情報も含まれる。
    """
    is_test = (
        kwargs["dag_run"].conf.get("is_test") == "1" if "dag_run" in kwargs else False
    )

    # 夕方バッチではスキップさせる
    if is_skip_test and is_test:
        raise AirflowSkipException

    job_args = kwargs["job_args"] if "job_args" in kwargs else {}
    if is_test:
        job_args["--IS_TEST"] = "1"

    # glue job を実行(詳細は割愛)
    run_job(job_name, job_args)

■ コード解説
関数の引数の kwargscontextop_kwargs のパラメータが含まれています。
またこの関数では、Musubi Insight のデータ集計を行う GlueJob が呼び出されるのですが、GlueJobにも --IS_TEST のパラメータを渡すことでテスト実行であることを教えています。

コード解説3

夕方バッチの効果

夕方バッチの運用が始まってから、以下のエラー(今まで発生したエラーの数々 のチャプターより再掲)における 1~3 のエラーを防止できることができるようになりました。

  1. PySpark スクリプトにバグが混入
  2. Airflow における依存関係の設定ミス(必要なデータが揃っていない状態で処理が動いてしまう)
  3. AWS リソース等の人的な操作ミス
  4. データ量増加に伴う、メモリ・ディスク容量不足やタイムアウト
  5. 意図しない例外データが含まれていた

4,5 やその他の原因でエラーが発生する可能性はあるためまだまだ油断できない状況ですが、
何よりもバッチ処理に変更を加えたい時にテスト環境があることや、通しでテストができることによる安心感など、エンジニアに与える メンタル面での恩恵が大きい ように感じます。

課題・次の改善アイデア

  1. データ量増加に伴う、メモリ・ディスク容量不足やタイムアウト
  2. 集計処理のパフォーマンスチューニングによる余分なメモリ・ディスク使用の削減
  3. 意図しない例外データが含まれていたケースの対応
  4. その日の夕方までに新たに追加されたデータで検証を行い、例外データのアラートを上げるような仕組み作り
  5. 頻繁に夕方バッチを実行するとAWSの利用料金がかさむ
  6. テスト範囲を絞ったり、input データのサンプリングなどによるコスト削減
  7. スクリプト自体の検証はできても、出力されるデータは検証できていない
  8. 夜間バッチと夕方バッチの結果を比較して意図しない差分が出ていないかを確認できるようにする

4に関しては、既に実現済みのため別途記事化予定(?)です。

最後に

「夕方バッチ」というバッチ処理のE2Eテスト環境を作ることで、エンジニアが安心してバッチ処理の開発・修正・更新ができるようになった話を紹介させていただきました。

カケハシではフロントエンドエンジニア、バックエンドエンジニア、データエンジニアなどなど、多くの業種で採用を強化しています。

  • 運用改善や開発環境の改善にも積極的に取り組んでいきたい

そんなエンジニアの方と一緒に仕事できると嬉しいです!