KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

Spark未経験のチームが2年間模索して実感した、効果的なパフォーマンス改善6選

こちらの記事は カケハシ Advent Calendar 2022 の17日目の記事になります。

こんにちは、カケハシで Musubi Insight のバックエンドエンジニアをしている末松です。

Musubi Insight に表示するデータは夜間の日次バッチで集計しているのですが、テスト・品質担保・パフォーマンスなどなど悩みが絶えません...
以前もバッチ処理のテストに関するブログを掲載しましたが、今回はパフォーマンスに関する記事になります! https://kakehashi-dev.hatenablog.com/entry/2022/08/12/094856

Musubi Insight データ集計の歴史

Musubi Insight プロダクトリリース時からデータ集計は AWS の Glue Job で作られていましたが、最初は Python Shell が中心でした。

利用薬局が増えデータ量も徐々に増加していく中で約2年前に PySpark での実装にチャレンジ、そこから全体的にデータ集計を Python Shell から PySpark に置き換えていく動きがありました。そして今では全てのデータ集計が PySpark で実装されています。

Musubi Insight の開発チームにはデータ基盤の構築経験に長けたエンジニアがいなかったこともあり、私を含めたバックエンドエンジニア3名で PySpark 周辺に関して模索しながらの実装でした。

そのため最初からパフォーマンス効率の良い実装ができたわけではなく、幾度となくパフォーマンスの課題にぶつかってきました。

この記事では、そんな Spark 未経験のチームが2年かけて模索した中で効果的だったパフォーマンス改善を紹介していきたいと思います。

同じように Spark にあまり詳しくなくて苦しんでいるチームの方に届くと嬉しいです...!!

効果的だったパフォーマンス改善

取り組みやすい順(主観ですが)にパフォーマンス改善策を紹介していきます。

取り組みやすさ、期待できる改善度合いなどを加味した 筆者おすすめ度 も付けてみました!(6選の中での相対的なおすすめ度)

  1. cacheを適切に設定する(おすすめ度 ★★★★★)
  2. 取得するデータ量を削減する(おすすめ度 ★★★★☆)
  3. Glueのバージョンアップ、ワーカー数・ワーカータイプの調整(おすすめ度 ★★★★☆)
  4. broadcast join の活用(おすすめ度 ★★★☆☆)
  5. UDF(ユーザー定義関数)を脱却する(おすすめ度 ★☆☆☆☆)
  6. 集計の順序を整理して無駄をなくす(おすすめ度 ★★☆☆☆)

以降、コード省略のため Spark Dataframe を生成する関数 create_spark_dataframe をたびたび登場させます。
以下のように Glue DataCatalog や S3 からデータを取得して SparkDataFrame として返すような関数をイメージしていただければと思います。

def create_spark_dataframe():
    return (
        # Glue DataCatalog から Glue の DynamicFrame として取得
        self.gc.create_dynamic_frame.from_catalog(
            database="database_name",
            table_name="table_name",
        )
        # Spark DataFrame に変換する
        .toDF()
    )

1. cacheを適切に設定する(おすすめ度 ★★★★★)

Spark が持つ「遅延評価」(※) という特性上、同じ処理が2度3度と実行される可能性があります。

DataFrame をcache させることで、これを防ぎます。

# create_spark_dataframeという関数は2回実行されてしまう
df = create_spark_dataframe()

process_1(df)
process_2(df)
# cacheすることで1回の実行で済む
df = create_spark_dataframe().cache()

process_1(df)
process_2(df)

そのため、同じDataFrameを使い回せるところがないか、cache を設定できる箇所がないかなど隅から隅まで確認することをお勧めします。

cache を設定するだけで、処理時間が1/2や1/3に短縮するなんてこともありました。

アクションがトリガーされるまでタスクが実行されず、アクションから逆算して実行計画が作られる

2. 取得するデータ量を削減する(おすすめ度 ★★★★☆)

続いてはデータの Input に関する内容になります。

データ集計のパフォーマンスはデータ量に大きく依存するので、扱うデータ量を減らすことができれば集計の時間もその分の削減が期待できます。

  • 期間を絞ることはできるだろうか
  • 不要な列を取得してないだろうか
  • 予め集計しておくことはできるだろうか

のように検討していきます。

from pyspark.sql import functions as F

df = (
    create_spark_dataframe()
    # 期間を絞る
    .filter(F.col("date").between(START_DATE, YESTERDAY))
    # 必要な列だけ取得する
    .select(
        "id",
        "date",
        # 予め集計できるものはしておく
        F.length("text").alias("text_length"),
    )
)

複雑なデータ型の列に対してのデータ量削減

上記の話は基礎的な内容なのでチームでも導入当初のうちから対応できていたのですが、
array (配列)や struct (連想配列)が組み合わさったような複雑なデータ型に対しての削減には対応できていませんでした。

※ 以降は Parquet 形式のデータを取得しての集計であることが前提になります

- items: array
    - elements: struct
        - id: string
        - name: string
        - values: array<string>
        - text: string

上記のような構造になっている items という列があると仮定します。

1. 集計で必要なのは id だけだった場合

シンプルにドットアクセスが可能です。

from pyspark.sql import functions as F

df = (
    create_spark_dataframe()
    .select(
        F.col("items.id").alias("item_ids")
    )
)

結果として以下のようなデータ構造になります。

- item_ids: array<string>

2. 集計で必要なのは idtext の文字数だけだった場合

arrays_ziptransform が活用できます。

from pyspark.sql import functions as F

df = (
    create_spark_dataframe()
    .select(
        F.arrays_zip(
            "items.id",
            F.transform("items.text", F.length)
        ).alias("items")
    )
    # arrays_zipではカラム名が0, 1になってしまうため、castしつつ付与する
    .cast("array<struct<id: string, text_length: int>>"),
)

結果として以下のようなデータ構造になります。

- items: array
    - elements: struct
        - id: string
        - text_length: int

実際に Musubi Insight チームでは例に挙げたものよりもさらに複雑で大きな列を扱うことがあり、arrays_ziptransform を使った対応を施すことでデータ処理時間やメモリ使用率を劇的に改善することができました。

3. Glueのバージョンアップ、ワーカー数・ワーカータイプの調整(おすすめ度 ★★★★☆)

タイトル通りのシンプルなチューニングですが、意外とバカにできないです。

Glueのバージョンアップ

基本的にバージョンを上げるだけでもパフォーマンス改善が期待できます。

実際チームでも 1.0 => 2.02.0 => 3.0 へのアップグレードをした経験がありますが、実感できるほどにパフォーマンスが改善した記憶があります。

ただし、古いバージョンで出力されたデータを新しいバージョンで読み取ろうとするとエラーするケースがあるので注意が必要です。(詳細は AWS の公式ガイドを参照)

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/release-notes.html

ワーカー数の調整

ワーカー数に関しては単純に上げるだけでもパフォーマンスは良くなる傾向にはありますが、コストパフォーマンスの面で最適なワーカー数を見極めるのが重要です。

GlueJob のメトリクスを有効にして、メモリやCPUの使用率を確認しましょう。

GlueJob Memory Profile

メモリ使用率に余裕がある => ワーカー数を減らせる可能性がある メモリ使用率に余裕がない => ワーカー数を増やした方がいいかもしれない

のような判断を普段から行い日々調整を行っています。

また、ワーカー数を増やすことによって I/O の効率が良くなり、データの入出力の速度が上がる傾向があります。

肌感ですが、コスパが良いのは最大でも40~50台くらいまでで、それ以上になるとそれほどパフォーマンス向上は見られない印象があります。
(もちろんデータや集計処理の特性にはよりますが)

ワーカータイプの調整

Glue には2種類のワーカータイプがあります。

G.1X ワーカータイプでは、各ワーカーは 1 DPU (4 vCPU、16 GB のメモリ、64 GB のディスク) にマッピングされており、ワーカーごとに 1 個のエグゼキュターを提供します。メモリを大量に消費するジョブには、このワーカータイプをお勧めします。 G.2X ワーカータイプでは、各ワーカーは 2 DPU (8 vCPU、32 GB のメモリ、128 GB のディスク) にマッピングされており、ワーカーごとに 1 個のエグゼキュターを提供します。メモリを大量に消費するジョブには、このワーカータイプをお勧めします。

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-jobs-job.html

  • IOがボトルネックであれば、G.1X
  • メモリがボトルネックであれば、G.2X

を選択するのが良いと思われます。

ETLのような集計処理に関して言えば、

  • 特定のデータを取得、変換、出力するような処理であれば、G.1X
  • 複数のデータを取得、joinして複雑な集計をし、出力するような処理であれば、G.2X

となると思いますが、実際には動かしてみないと良い/悪いの判断はできないような気がします...

4. broadcast join の活用(おすすめ度 ★★★☆☆)

Spark で A x B の join を行う場合、通常 A と B のデータは各ノードに分散されて結合が行われます。

しかし仮に B のデータがかなり小さい場合、broadcast(全てのノードに配布する)することでパフォーマンスの改善が期待できます。

from pyspark.sql import functions as F

A.join(
    F.broadcast(B),
    ["join_key"]
)

また、Spark の設定には autoBroadcastJoinThreshold というものがあり、デフォルトが 10MB になっています。

上記のコードのようにしなくとも、10MB以下のデータであれば Spark 側で自動的に broadcast join をしてくれます。

チームでの体感ですが、300MB ほどのデータであっても broadcast join をさせることでパフォーマンスの改善が期待できると思います。

broadcast join の活用方法をまとめると

  1. コード上で broadcast join を明示的に指示する
  2. autoBroadcastJoinThreshold の設定値を上げる

となります。

5. UDF(ユーザー定義関数)を脱却する(おすすめ度 ★☆☆☆☆)

ちょっと複雑な処理を実装する際、UDF(ユーザー定義関数)は非常に便利です。

pandas の apply のように、DataFrame の各行や各列に対して Python の関数を適用することができるので、PySpark 独自の書き方をあまり知らなくても Python の書き方で実装することができます。

しかし PySpark 独自の書き方ができるのであれば、UDF で実装されたものよりパフォーマンスが良くなると思います。

例えば、条件に応じてステータスを割り振る処理が UDF で実装されていた場合、

from pyspark.sql import functions as F

@F.UDF
def setting_status(cols):
    col1, col2 = cols
    if col1 < col2:
        return 1
    elif col1 == col2:
        return 2
    elif col1 > col2:
        return 3

df.withColumn(
    "status",
    setting_status(F.array("col1", "col2"))
)

以下のように UDF を脱却することができます。

from pyspark.sql import functions as F

df.withColumn(
    "status",
    F.when(F.col("col1") < F.col("col2"), F.lit(1))
    .when(F.col("col1") == F.col("col2"), F.lit(2))
    .when(F.col("col1") > F.col("col2"), F.lit(3)),
)

これはあくまで一例ですが、もし UDF を使っている箇所があれば一度 Spark の書き方で置き換えできないか調べてみると良いでしょう。

6. 集計の順序を整理して無駄をなくす(おすすめ度 ★★☆☆☆)

最後に紹介するのは、ちょっと一手間一苦労必要ですが集計の順序を整理してリファクタリングを行う方法です。

  • filter や join などで先にデータを小さくしてから集計できないだろうか
  • 大きなデータとの join はもっと後ろ倒しにできないだろうか
  • 無駄に join をしてないだろうか
  • もっとシンプルに集計できないだろうか
  • 使いまわせるデータはないだろうか、共通化できないだろうか

などを頭に入れつつ、集計処理をリファクタリングできないか検討すると良いでしょう。

やっていることはどんな言語でも共通するようなリファクタリングですが、結局のところこういった改善が一番パフォーマンスに影響したりします。(経験談)

またこのようなリファクタリングは、可読性の向上やバグの予防などパフォーマンス以外のメリットもあるので大事です。

最後に

Spark 未経験のチームが実感した6つのパフォーマンス改善を紹介させていただきました。

どれも試行錯誤の末結果の出た改善であり、紹介したもの以外にも多くの改善を試みてきました。

そんな Musubi Insight チームでは、データ集計やデータ基盤の設計・構築を得意とするデータエンジニアを募集しています。

ユーザーに直接届くデータを作りたい、整備したい、そんなエンジニアの方はぜひ一度お話ししましょう!

その他にも多種多様なポジションでの採用を行っておりますので、ぜひご確認ください!

参考

AWS Glue ETL パフォーマンス・チューニング① 基礎知識編 AWS Glue ETL パフォーマンス・チューニング② チューニングパターン編 AWS GlueでApache Sparkジョブをスケーリングし、データをパーティション分割するためのベストプラクティス