【Python】鈍足コードのリファクタリング

記事タイトルとURLをコピーする

はじめに

アプリケーションサービス部の鎌田(義)です。

アドベントカレンダー6日目ということで、
最近Pythonでコードを書いていて試行錯誤した内容をブログにしたいと思います。

概要

先日、業務で60万行程あるcsvファイルとログファイルの一部を結合する必要があった為、
Pythonで実装することにしました。

大きなデータを扱う処理を普段書く機会が少ないこともありますが、
突貫で書いて実行してみたところ、完了まで20分以上かかる結果となりました。

いろいろと初歩的な見落としもあり、修正を重ねた結果最終的に1分で完了するまで短縮できました。
せっかくの機会なのでリファクタリングの過程を備忘として記載します。

背景

本記事の本質的な部分ではない為、本項は読み飛ばして頂いても問題ありません。

JMeterで負荷試験を行った後に出力する統計レポートというcsvファイルがあり、
このレポートには試験シナリオの各ステップ(HTTPリクエスト)の結果が1行ごとに記載されています。

JMeter負荷試験では、同じ試験シナリオをログインユーザを変えて何スレッドもテストを行っていますが、
統計レポートには、ログインユーザの情報などは出力されません。
その為、ログインユーザの情報は別途ログに出力させるようにし、
後から統計レポート(csv)とログファイルに出力したログインユーザ情報を結合することにしました。

試験後にアプリケーション側でデバッグをする際などに各スレッドで使用したログインユーザ情報も確認したいという要件があった為、このような対応をしています。

使用したファイル

JMeterが生成する統計レポート.csvは以下のような形式です

  • loadtest_statistics_report.csv

ログインユーザ情報を記録した各ログは以下のような文字列です。
インスタンス毎にログを出力しており、インスタンスのIPアドレスをファイル名にしています。

  • <IPアドレス>.log
{"threadNum": "8", "userId": "testUser06"}
{"threadNum": "10", "userId": "testUser08"}

検証環境

Amazon EC2

  • AMI
    • al2023-ami-minimal-2023.1.20230809.0-kernel-6.1-x86_64
  • インスタンスタイプ
    • r5.xlarge (CPU: 4, Memory: 32GB)

Python

  • バージョン
    • 3.11.4
  • パッケージ
    • 標準パッケージのみ

検証に使用したデータ

検証コードの中で使用したリストの中身は以下のような形式です。

  • statistics_report (統計レポート.csvを読み込んだリストデータ)
[
    {'timeStamp': '1692763970640', 'elapsed': '556', 'label': '1-1. トップページを開く /', 'responseCode': '200', 'responseMessage': 'OK', 'threadName': '192.168.10.101-負荷テストサンプル 1-6', 'dataType': 'text', 'success': 'true', 'failureMessage': '', 'bytes': '6265', 'sentBytes': '666', 'grpThreads': '12', 'allThreads': '12', 'URL': 'https://test.co.jp/', 'Latency': '554', 'IdleTime': '0', 'Connect': '460'},
    {'timeStamp': '1692763971110', 'elapsed': '383', 'label': '1-1. トップページを開く /', 'responseCode': '200', 'responseMessage': 'OK', 'threadName': '192.168.20.102-負荷テストサンプル 1-8', 'dataType': 'text', 'success': 'true', 'failureMessage': '', 'bytes': '6265', 'sentBytes': '666', 'grpThreads': '12', 'allThreads': '12', 'URL': 'https://test.co.jp/', 'Latency': '380', 'IdleTime': '0', 'Connect': '309'},
    {...},
]
  • statistics_report_2d (statistics_reportをインスタンス毎に2次元に変換したリスト)
[
    [
        {'timeStamp': '1692763970640', 'elapsed': '556', 'label': '1-1. トップページを開く /', 'responseCode': '200', 'responseMessage': 'OK', 'threadName': '192.168.10.101-負荷テストサンプル 1-6', 'dataType': 'text', 'success': 'true', 'failureMessage': '', 'bytes': '6265', 'sentBytes': '666', 'grpThreads': '12', 'allThreads': '12', 'URL': 'https://test.co.jp/', 'Latency': '554', 'IdleTime': '0', 'Connect': '460'},
        {...},
    ],
    [
        {'timeStamp': '1692763971110', 'elapsed': '383', 'label': '1-1. トップページを開く /', 'responseCode': '200', 'responseMessage': 'OK', 'threadName': '192.168.20.102-負荷テストサンプル 1-8', 'dataType': 'text', 'success': 'true', 'failureMessage': '', 'bytes': '6265', 'sentBytes': '666', 'grpThreads': '12', 'allThreads': '12', 'URL': 'https://test.co.jp/', 'Latency': '380', 'IdleTime': '0', 'Connect': '309'},
        {...},
    ],
    [
        {...},
    ]
]
  • logs_2d (各インスタンス毎のログファイルを読み込んだリスト。文字列を辞書型に変換、IPアドレスを追加で付与しています。)
[
    [
        {'threadNum': '8', 'userId': 'testUser06', 'ipAddress': '192.168.10.101'},
        {...},
    ],
    [
        {'threadNum': '11', 'userId': 'testUser02', 'ipAddress': '192.168.20.102'},
        {...},
    ],
    [
        {'threadNum': '8', 'userId': 'testUser06', 'ipAddress': '192.168.30.103'},
        {...},
    ]
]
  • logs (logs_2dを1次元に変換したリスト)
[
    {'threadNum': '8', 'userId': 'testUser06', 'ipAddress': '192.168.10.101'},
    {'threadNum': '10', 'userId': 'testUser08', 'ipAddress': '192.168.10.101'},
    {...},
]

検証

最初に書いたコード

import csv
import json
import itertools
import time
  
log_files = ["192-168-10-101.log", "192-168-20-102.log", "192-168-30-103.log"]
report_file = "loadtest_statistics_report.csv"
  
  
def load_statistics_report():
    """統計レポートを読み込む関数"""
    with open(f"reports/{report_file}", "r") as f:
        reader = csv.DictReader(f)
        statistics_report = [row for row in reader]
    return statistics_report
  
  
def to_dict_and_dict_param_add(str_row, ip_addr):
    """文字列の辞書変換とIPアドレスを辞書に追加する関数"""
    dict_row = json.loads(str_row)
    dict_row["ipAddress"] = ip_addr
    return dict_row
  
  
def load_logs_2d():
    """ログファイルを読み込む関数"""
    logs_2d = []
    for log_file in log_files:
        ip_addr = log_file.split(".")[0].replace("-", ".")
        with open(f"logs/{log_file}", "r") as f:
            logs_2d.append([to_dict_and_dict_param_add(row, ip_addr) for row in f])
    return logs_2d
  
  
def merge_login_user(statistics_report, logs):
    """統計レポートとログをマージする関数
    統計レポートのthreadNameに含まれる、IPアドレスとスレッド番号を取得し
    一致するログのuserIdを統計レポートに追加する
    """
    merged_report = []
    for report_row in statistics_report:
        ip_addr = report_row["threadName"].split("-")[0]
        thread_num = report_row["threadName"].split("-")[-1]
        # 約60万行のレポートデータ x 約2万行のログデータの2重ループ
        target_logs = list(filter(lambda log: log["ipAddress"] == ip_addr and log["threadNum"] == thread_num, logs))
        if not target_logs:
            continue
        report_row["userId"] = target_logs[0]["userId"]
  
        merged_report.append(report_row)
    return merged_report
  
  
if __name__ == "__main__":
    start = time.time()
  
    statistics_report = load_statistics_report()
    logs_2d = load_logs_2d()
    logs = list(itertools.chain.from_iterable(logs_2d))
  
    # ここがボトルネック
    merged_report = merge_login_user(statistics_report, logs)
  
    end = time.time()
    print(f"実行時間(秒): {end - start}")
  • 実行結果
    • 実行時間(秒): 1367.538246870041 (約22分)

ボトルネックになっているコード

merge_login_userの以下のコードがボトルネックとなっていました。
60万行のレポートと2万行のログファイルの2重ループで
120億回(60万 * 2万)も繰り返す為、非常に遅いコードとなっています。

レポートを1行ずつチェックし、ログファイルに合致するIPアドレスとスレッド番号がある場合ログを抽出しています。

for report_row in statistics_report:
    ip_addr = report_row["threadName"].split("-")[0]
    thread_num = report_row["threadName"].split("-")[-1]
    # 約60万行のレポートデータ x 約2万行のログデータの2重ループ
    target_logs = list(filter(lambda log: log["ipAddress"] == ip_addr and log["threadNum"] == thread_num, logs))

以下はfor文で書き直したコードですが、今回の場合リスト全てを走査する必要はない為
合致するログを見つけ次第breakすべきでした。

target_logs = []
~~~
  
for log in logs:
    if log["ipAddress"] == ip_addr and log["threadNum"] == thread_num:
        target_logs.append(log)

リファクタリング

ループ回数を削減

各ログファイルのスレッド番号とIPアドレスの組み合わせは、ユニークなことが今回分かっているので
合致する1行を見つけたらループを終了して結果を返すようにしました。
内包表記ではbreakが使用できない為、以下のように修正しました。
ジェネレータを生成して1回だけ実行しています。

# target_logs = list(filter(lambda log: log["ipAddress"] == ip_addr and log["threadNum"] == thread_num, logs))
target_log = next(log for log in logs if log["ipAddress"] == ip_addr and log["threadNum"] == thread_num)

もしくは以下のように修正

target_log = None
~~~
  
for log in logs:
    if log["ipAddress"] == ip_addr and log["threadNum"] == thread_num:
        target_log = log
        break

merge_login_user関数を修正しました。

def merge_login_user(statistics_report, logs):
    """統計レポートとログをマージする関数
    統計レポートのthreadNameに含まれる、IPアドレスとスレッド番号を取得し
    一致するログのuserIdを統計レポートに追加する
    """
    merged_report = []
    for report_row in statistics_report:
        ip_addr = report_row["threadName"].split("-")[0]
        thread_num = report_row["threadName"].split("-")[-1]
        try:
            target_log = next(log for log in logs if log["ipAddress"] == ip_addr and log["threadNum"] == thread_num)
        except StopIteration:
            continue
        report_row["userId"] = target_log["userId"]
  
        merged_report.append(report_row)
    return merged_report

これだけで大幅に時間が短縮されました。

  • 実行時間(秒): 344.01408195495605 (約5分44秒)

if条件の見直し

if文の判定順序を変更しました。
今回は、判定順序に特に意味がない、かつスレッド番号は各IPアドレス毎にユニークな為、
よりデータを絞り込めるスレッド番号を先に判定することでさらに早くなりました。
(ログファイルの中に同一IPアドレスは6667個存在するが、同一スレッド番号は3個しか存在しない)

# target_log = next(log for log in logs if log["ipAddress"] == ip_addr and log["threadNum"] == thread_num)
target_log = next(log for log in logs if log["threadNum"] == thread_num and log["ipAddress"] == ip_addr)
  • 実行時間(秒): 280.7838773727417 (約4分40秒)

ロジックの変更(ファイル分割)

2重ループはオーダー記法で表すとO(n2)となり、今回の例ですと、約60万 x 約2万=120億の計算量となります。
※オーダー記法はアルゴリズムの比較をする際などで用いられる計算量を表現する方法です。
例えばIPアドレス毎にレポートとログファイルを分割し、分割したファイル同士を使って2重ループすると約20万 x 約7千=14億となります。
さらに3回繰り返す為、14億 x 3回=42億になりますが、それでも当初の3分の1ほどとなり計算量を大きく削減できます。

統計レポートを2次元に変換するよう以下の関数を追加しました。

def statistics_report_to_2d(statistics_report):
    """統計レポートを2次元配列に変換する関数"""
    statistics_report_2_d = []
    for log_file in log_files:
        ip_addr = log_file.split(".")[0].replace("-", ".")
        statistics_report_2_d.append(list(filter(lambda row: row["threadName"].split("-")[0] == ip_addr, statistics_report)))
  
    return statistics_report_2_d

実行部分を以下のように修正しました。

if __name__ == "__main__":
    start = time.time()
  
    statistics_report = load_statistics_report()
    logs_2d = load_logs_2d()
    statistics_report_2d = statistics_report_to_2d(statistics_report)
  
    merged_report = []
    for statistics_report_1d, logs_1d in zip(statistics_report_2d, logs_2d):
        merged_report.extend(merge_login_user(statistics_report_1d, logs_1d))
  
    end = time.time()
    print(f"実行時間(秒): {end - start}")

statistics_reportもlogsも、次元数を揃え同じ順序で並んでいる為
IPアドレスの判定が不要になる為、merge_login_user関数を以下のように修正しました。

def merge_login_user(statistics_report, logs):
    """統計レポートとログをマージする関数
    統計レポートのthreadNameに含まれる、IPアドレスとスレッド番号を取得し
    一致するログのuserIdを統計レポートに追加する
    """
    merged_report = []
    for report_row in statistics_report:
        thread_num = report_row["threadName"].split("-")[-1]
        try:
            # ここを修正しました
            target_log = next(log for log in logs if log["threadNum"] == thread_num)
        except StopIteration:
            continue
        report_row["userId"] = target_log["userId"]
  
        merged_report.append(report_row)
    return merged_report
  • 実行時間(秒): 99.23363924026489 (約1分39秒)

並列実行(3プロセス)

マルチプロセスの実行には、今回はconcurrentを使用します。
また、今回はプロセス数を3で実行する為、あらかじめproc_numとして定義しておきます。

以下のコードを冒頭に追加します。

from concurrent.futures import ProcessPoolExecutor
  
proc_num = 3

実行部分を以下のように修正しました。

if __name__ == "__main__":
    start = time.time()
  
    statistics_report = load_statistics_report()
    logs_2d = load_logs_2d()
    statistics_report_2d = statistics_report_to_2d(statistics_report)
  
    merged_report = []
    with ProcessPoolExecutor(max_workers=proc_num) as executor:
        for data in executor.map(merge_login_user, statistics_report_2d, logs_2d):
            merged_report.extend(data)
  
    end = time.time()
    print(f"実行時間(秒): {end - start}")
  • 実行時間(秒): 59.61389374732971

子プロセスが新たに3つ立ち上がり、
3コア使用することになる為、CPUへの負荷は上がる点にご注意下さい。
また、各プロセスで実行する関数にサイズの大きなデータを渡しているので、Memory使用量も上がります。
参考値として、これまでのシングルプロセスでは、最大でCPU1コアの使用率が100%、Memory使用量が最大約1GB程度でしたが、
3プロセスでは、最大で3コアの使用率が100%、Memory使用量が最大約3GBとなっていました。

まとめ

リファクタリングしていく過程はいずれも初歩的で基本的な内容かもしれませんが、
実際に実行時間が短縮されている様子を見るのは楽しかったです。

並列処理は最後にお試し的に使ってみましたが、
まずはデータの特性を理解してコードを見直すことで改善できる可能性はあるのかなと思います。

何かの参考になれば幸いです。
最後までご覧頂きありがとうございました。

参考

[初心者向け] プログラムの計算量を求める方法
concurrent.futures -- 並列タスク実行