
TensorFlow
イベント
該当するコンテンツが見つかりませんでした
マガジン
技術ブログ
G-gen の片岩です。当記事では Vertex AI Custom Training において カスタムコンテナ を使用し、標準では提供されていない LightGBM モデルの学習から 寄与度(SHAP)の出力 まで実行する方法を紹介します。 はじめに ビルド済みコンテナとカスタムコンテナの使い分け カスタムコンテナの利点 構成図 初期設定 データの準備と分割 カスタムコンテナの準備 ディレクトリとリポジトリの準備 学習スクリプトの作成 Dockerfile の作成 コンテナのビルドとプッシュ 学習ジョブの実行 推論と評価指標の確認 分析レポートの解釈 はじめに ビルド済みコンテナとカスタムコンテナの使い分け Vertex AI Custom Training には学習ジョブを実行するためのコンテナイメージとして、大きく 2 つの選択肢があります。 コンテナの種類 特徴 向いているケース ビルド済みコンテナ Google Cloud が用意したイメージ XGBoost や TensorFlow など、標準的なフレームワークをすぐに使いたい時 カスタムコンテナ 自分で Dockerfile を書いて作成するイメージ LightGBM など未提供のライブラリを使いたい時や、独自の処理を組み込みたい時 カスタムコンテナの利点 ビルド済みコンテナでもジョブ実行時の引数に requirements=["lightgbm", "shap"] のように指定することでライブラリを追加できます。ビルド済みコンテナについては以下の記事を参照してください。 blog.g-gen.co.jp しかし実務の本番運用において、実行時にライブラリを動的にインストールすることは、以下のデメリットがあります。 1 点目は、 環境の再現性が低下する ことです。 ジョブを実行するたびにインターネットから最新のパッケージを取得するため、依存ライブラリのバージョンが上がったために突然ジョブが落ちたり、学習結果が変わってしまうといった、本番運用で避けたいリスクを招きます。 2 点目は、 実行のたびにオーバーヘッドが発生する ことです。 毎回ライブラリをダウンロードしてインストールする処理が走るため、余計な待ち時間が発生します。 カスタムコンテナを利用することにより、上記のデメリットを回避できます。 参考 : カスタム コンテナの概要 構成図 当記事で紹介する手順に関する構成図は以下のとおりです。環境構築の負荷を軽減するため、ソースコードの作成や Python 実行環境に Colab Enterprise を使用します。 初期設定 はじめにライブラリのインストールと環境変数の設定を行います。今回は可視化や解釈のためのライブラリ( seaborn 、 shap )も追加します。 # 必要なライブラリのインストール !pip install google-cloud-aiplatform lightgbm shap scikit-learn pandas seaborn matplotlib -q # プロジェクトとリージョンの設定 # ※ ご自身の環境に合わせて書き換えてください PROJECT_ID = "your-project-id" LOCATION = "asia-northeast1" # バケットとフォルダの定義 ROOT_BUCKET = "gs://your-bucket" EXPERIMENT_NAME = "diamonds-lgbm-v1" WORK_DIR = f "{ROOT_BUCKET}/{EXPERIMENT_NAME}" # Vertex AI SDK の初期化 from google.cloud import aiplatform aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=WORK_DIR) # バケットが存在しない場合のみ作成 !gsutil mb -l {LOCATION} {ROOT_BUCKET} データの準備と分割 データは機械学習デモで使用されるダイヤモンドの価格データを使用します。このデータはカラットなどの数値データや、カットや色といったカテゴリ変数を含みます。 学習データと推論データに分割して Cloud Storage に保存します。 import seaborn as sns from sklearn.model_selection import train_test_split import pandas as pd # データのロード (~54,000行) df = sns.load_dataset( 'diamonds' ) # 文字列カラムを 'category' 型に変換 cat_cols = [ 'cut' , 'color' , 'clarity' ] for col in cat_cols: df[col] = df[col].astype( 'category' ) # 学習データと推論データに 90:10 の割合で分割 train_full_df, test_df = train_test_split(df, test_size= 0.1 , random_state= 42 ) # データの保存 train_filename = "train.csv" train_full_df.to_csv(train_filename, index= False ) test_filename = "test.csv" test_df.to_csv(test_filename, index= False ) # GCS へアップロード !gsutil cp {train_filename} {WORK_DIR}/data/{train_filename} !gsutil cp {test_filename} {WORK_DIR}/data/{test_filename} print (f "学習データ: {WORK_DIR}/data/{train_filename}" ) print (f "推論データ: {WORK_DIR}/data/{test_filename}" ) カスタムコンテナの準備 ディレクトリとリポジトリの準備 Colab Enterprise 上に作業ディレクトリを用意し、Google Cloud 上に完成したコンテナの保存先となる Artifact Registry のリポジトリを作成します。 # 作業用ディレクトリの作成 !mkdir -p custom_container # Artifact Registry にリポジトリを作成 (初回のみ) !gcloud artifacts repositories create custom-training-repo \ --repository- format =docker \ --location={LOCATION} \ --description= "Custom Training Repository" || true 学習スクリプトの作成 コンテナ内で実行される task.py を作成します。 今回はモデルの学習だけでなく、過学習を確認するための学習曲線と、予測の根拠を説明するための寄与度の画像を生成し、モデルと一緒に Cloud Storage へアップロードする処理を組み込みます。 %%writefile custom_container/task.py import argparse import os import pandas as pd import lightgbm as lgb import shap import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from google.cloud import storage from urllib.parse import urlparse import warnings warnings.filterwarnings( 'ignore' ) parser = argparse.ArgumentParser() parser.add_argument( '--train-data-uri' , dest= 'train_data_uri' , type = str , required= True ) args = parser.parse_args() # --- GCS ダウンロード / アップロード用の関数 --- def download_from_gcs (gcs_uri, local_file): parsed_url = urlparse(gcs_uri) client = storage.Client() bucket = client.bucket(parsed_url.netloc) blob = bucket.blob(parsed_url.path.lstrip( "/" )) blob.download_to_filename(local_file) def upload_to_gcs (local_file, gcs_dir): parsed_url = urlparse(gcs_dir) client = storage.Client() bucket = client.bucket(parsed_url.netloc) blob_path = f "{parsed_url.path.lstrip('/').rstrip('/')}/{local_file}" bucket.blob(blob_path).upload_from_filename(local_file) # --- 1. データの準備 --- print (f "Downloading data from {args.train_data_uri}..." , flush= True ) local_train_file = "train.csv" download_from_gcs(args.train_data_uri, local_train_file) df = pd.read_csv(local_train_file) cat_cols = [ 'cut' , 'color' , 'clarity' ] for col in cat_cols: df[col] = df[col].astype( 'category' ) X = df.drop(columns=[ "price" ]) y = df[ "price" ] # スクリプト内で学習用と検証用に分割 (データリーク防止) X_train, X_val, y_train, y_val = train_test_split(X, y, test_size= 0.1 , random_state= 42 ) # --- 2. モデルの学習 --- print ( "Training LightGBM model..." , flush= True ) model = lgb.LGBMRegressor(n_estimators= 100 , random_state= 42 ) # 学習過程を記録するために eval_set を渡す model.fit( X_train, y_train, eval_set=[(X_train, y_train), (X_val, y_val)], eval_names=[ 'train' , 'valid' ] ) # --- 3. 分析画像の生成と保存 --- # ① 学習曲線の描画 lgb.plot_metric(model, metric= 'l2' ) plt.title( 'Learning Curve (MSE)' ) plt.tight_layout() plt.savefig( "learning_curve.png" ) plt.close() # ② SHAP値(寄与度)の描画 print ( "Calculating SHAP values..." , flush= True ) explainer = shap.TreeExplainer(model) shap_values = explainer(X_val.sample( min ( 1000 , len (X_val)), random_state= 42 )) plt.figure() shap.plots.beeswarm(shap_values, show= False ) plt.title( "SHAP Feature Importance" ) plt.tight_layout() plt.savefig( "shap_importance.png" ) plt.close() # --- 4. 成果物のアップロード --- aip_model_dir = os.getenv( "AIP_MODEL_DIR" ) if aip_model_dir: print (f "Uploading artifacts to: {aip_model_dir}" , flush= True ) model.booster_.save_model( "model.txt" ) upload_to_gcs( "model.txt" , aip_model_dir) upload_to_gcs( "learning_curve.png" , aip_model_dir) upload_to_gcs( "shap_importance.png" , aip_model_dir) print ( "Upload completed." , flush= True ) Dockerfile の作成 Dockerfile を記述します。ベースイメージには Python 3.12 を指定し、LightGBM に必要な libgomp1 をインストールします。 %%writefile custom_container/Dockerfile FROM python: 3.12 -slim # LightGBM に必須の OS ライブラリをインストール RUN apt-get update && apt-get install -y --no-install-recommends \ libgomp1 \ && rm -rf /var/lib/apt/lists/* # 必要な Python ライブラリのインストール RUN pip install --no-cache- dir \ pandas scikit-learn lightgbm shap matplotlib google-cloud-storage WORKDIR /app COPY task.py /app/task.py ENTRYPOINT [ "python" , "task.py" ] コンテナのビルドとプッシュ Cloud Build を使用してコンテナをビルドし、プッシュします。 # Cloud Build でビルドとプッシュを実行 REPO_NAME = "custom-training-repo" IMAGE_URI = f "{LOCATION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/lgbm-shap-trainer:latest" !gcloud builds submit --tag {IMAGE_URI} ./custom_container 学習ジョブの実行 作成した自作コンテナ ( IMAGE_URI ) を指定して、学習ジョブを送信します。引数 base_output_dir を指定することで、指定した Cloud Storage のパス配下にモデルや画像を保存できます。 # ジョブの定義 job = aiplatform.CustomContainerTrainingJob( display_name= "diamonds-lgbm-shap-job" , container_uri=IMAGE_URI, ) # ジョブの実行 print ( "ジョブを送信しました。完了までお待ちください..." ) job.run( machine_type= "n1-standard-4" , replica_count= 1 , args=[ f "--train-data-uri={WORK_DIR}/data/train.csv" ], # 成果物の保存先フォルダを指定 base_output_dir=f "{WORK_DIR}/model_output" ) 推論と評価指標の確認 ジョブ完了後、Cloud Storage から学習済みモデルをダウンロードし、Colab Enterprise 上でテストデータに対する精度評価を行います。 import numpy as np import lightgbm as lgb from sklearn.metrics import r2_score, mean_squared_error import pandas as pd # 1. 学習の成果物のダウンロード MODEL_DIR = f "{WORK_DIR}/model_output/model" print ( "学習済みモデルと分析画像をダウンロードします..." ) !gsutil cp {MODEL_DIR}/model.txt . !gsutil cp {MODEL_DIR}/learning_curve.png . !gsutil cp {MODEL_DIR}/shap_importance.png . # 2. テストデータの読み込み df_test = pd.read_csv(f "{WORK_DIR}/data/test.csv" ) cat_cols = [ 'cut' , 'color' , 'clarity' ] for col in cat_cols: df_test[col] = df_test[col].astype( 'category' ) X_test = df_test.drop(columns=[ "price" ]) y_true = df_test[ "price" ] # 3. ローカル推論の実行 local_model = lgb.Booster(model_file= "model.txt" ) predictions = local_model.predict(X_test) # 4. 評価指標の計算と表示 r2 = r2_score(y_true, predictions) rmse = np.sqrt(mean_squared_error(y_true, predictions)) print ( "-" * 30 ) print (f "評価結果 (データ数: {len(y_true)}件)" ) print (f "R2 Score (決定係数): {r2:.4f}" ) print (f "RMSE (誤差の大きさ): {rmse:.4f}" ) print ( "-" * 30 ) 以下は筆者の環境における実行結果です。R2スコアが 0.98 を超える精度の高いモデルが作成できました。 ------------------------------ 評価結果 (データ数: 5394件) R2 Score (決定係数): 0.9817 RMSE (誤差の大きさ): 543.6218 ------------------------------ 分析レポートの解釈 単に予測精度を出すだけでなく、AI が なぜその予測をしたのか を解釈することは実務において重要です。コンテナ内で生成した学習曲線の画像と SHAP を用いた個別データの分析結果を確認します。 import shap from IPython.display import Image, display print ( "=== 学習曲線 (過学習の確認) ===" ) display(Image( "learning_curve.png" )) print ( " \n === 全体の寄与度 (SHAP Beeswarm) ===" ) display(Image( "shap_importance.png" )) # --- 個別データに対するSHAP(表形式)--- print ( " \n === 特定のデータ(1件目)の予測の根拠 ===" ) explainer = shap.TreeExplainer(local_model) single_instance = X_test.iloc[[ 0 ]] shap_values_single = explainer(single_instance) shap_df = pd.DataFrame({ "特徴量 (Feature)" : single_instance.columns, "実際の値 (Value)" : single_instance.values[ 0 ], "価格への影響 (SHAP値)" : shap_values_single.values[ 0 ] }) shap_df = shap_df.reindex(shap_df[ "価格への影響 (SHAP値)" ].abs().sort_values(ascending= False ).index) base_value = explainer.expected_value predicted_price = predictions[ 0 ] print (f "【ベースライン価格 (平均)】: {base_value:.2f}" ) display(shap_df.style.format({ "価格への影響 (SHAP値)" : "{:+.2f}" }).hide(axis= "index" )) print (f "【最終予測価格】: {predicted_price:.2f}" ) 学習曲線(Learning Curve) を確認すると、学習データと検証データの誤差(MSE)が共に右肩下がりで収束しています。 これは、未知のデータである検証データに対しても過学習を起こすことなく学習ができている証拠です。 全体の寄与度 では、上にある特徴量ほど予測への影響力が大きいことを示しています。横軸の 0 を基準に、右側が 価格を上げる要因 、左側が 価格を下げる要因 です。 プロットの赤色は数値が大きいデータであり、青色は数値の小さいデータを表しています。例えば carat は右側に赤色でプロットされているため、 carat が大きいほど高価になる ことが分かります。 最後に 特定の1件に対する予測の根拠 を表形式で出力しました。 全体の平均価格(ベースライン)を基準として、「重さが0.24カラットと小さいためマイナス評価」「透明度(clarity)がVVS1と高品質であるためプラス評価」といったように、最終的な予測価格に至るまでの内部の計算ロジックをビジネス部門に説明できます。 片岩 裕貴 (記事一覧) クラウドソリューション部 クラウドディベロッパー課 和歌山県在住のエンジニア。興味分野はAI/ML。Google Cloud Partner Top Engineer に選出(2025 / 2026)。
G-gen の片岩です。当記事では Vertex AI Custom Training を使用して、機械学習モデルのトレーニングから推論まで実行する方法を紹介します。Vertex AI Custom Training を使うことで、クラウド上のフルマネージド環境で機械学習モデルをトレーニングできます。 はじめに 機械学習モデルのトレーニング手法 Vertex AI Custom Training とは 構成図 初期設定 データの準備と分割 学習スクリプトの作成 学習ジョブの実行 推論の実行 推論結果の確認 はじめに 機械学習モデルのトレーニング手法 Google Cloud のマネージドサービスを使って機械学習モデルをトレーニングする方法は、大きく分けて3つあります。 手法 向いているケース Vertex AI AutoML データを用意するだけで、少ない手順でモデルをトレーニングしたい時 BigQuery ML BigQuery のデータを使用して SQL でモデルをトレーニングしたい時 Vertex AI Custom Training モデルの開発環境や工程を細かく制御したい時 当記事では、最も柔軟性の高い Vertex AI Custom Training に焦点を当て、モデルのトレーニングおよび推論を実行する方法をご紹介します。 Vertex AI についての解説、Vertex AI AutoML や BigQuery ML についての詳細は以下の記事を参照してください。 blog.g-gen.co.jp blog.g-gen.co.jp blog.g-gen.co.jp Vertex AI Custom Training とは Vertex AI Custom Training は、PyTorch、TensorFlow、Scikit-learn や XGBoost といったフレームワークを実行できるフルマネージドサービスです。 ローカルで開発した Python コードを Docker コンテナとして実行するため、サーバーの起動や停止といったインフラ管理を意識せず、 コードの自由度 と クラウドのスケーラビリティ を両立できます。 AutoML では対応しきれない細かいチューニングや、独自のアルゴリズムを実装したいエンジニアにとって、有用な選択肢です。 参考 : カスタム トレーニングの初心者向けガイド 構成図 当記事で紹介する手順に関する構成図は、以下のとおりです。環境構築の負荷を軽減するため、ソースコードの作成や Python 実行環境に Colab Enterprise を使用します。 Colab Enterprise については以下の記事を参照してください。 blog.g-gen.co.jp 初期設定 はじめに、ライブラリのインストールと環境変数の設定を行います。 今後、別のモデルを作成する時にバケットを使い回せるようにするため、Cloud Storage はバケットの中にディレクトリを作成します。 # 必要なライブラリのインストール !pip install google-cloud-aiplatform xgboost scikit-learn pandas -q # プロジェクトとリージョンの設定 # ※ ご自身の環境に合わせて書き換えてください PROJECT_ID = "your-project-id" LOCATION = "asia-northeast1" # バケットとフォルダの定義 ROOT_BUCKET = "gs://your-bucket" EXPERIMENT_NAME = "california-housing-xgb-v1" WORK_DIR = f "{ROOT_BUCKET}/{EXPERIMENT_NAME}" # Vertex AI SDK の初期化 from google.cloud import aiplatform # staging_bucket を指定すると、自動生成されるファイルもこのフォルダに整理されます aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=WORK_DIR) # バケットが存在しない場合のみ作成 !gsutil mb -l {LOCATION} {ROOT_BUCKET} データの準備と分割 データは機械学習デモにしばしば使用される California Housing(カリフォルニアの住宅価格)を使用します。このデータセットは、scikit-learn に含まれています。 モデルの作成に使用する学習データと推論に使用する推論データに分割し、Cloud Storage に保存します。 from sklearn.datasets import fetch_california_housing from sklearn.model_selection import train_test_split import pandas as pd # データのロード data = fetch_california_housing(as_frame= True ) df = data.frame # 学習データと推論データに 90:10 の割合で分割 train_full_df, test_df = train_test_split(df, test_size= 0.1 , random_state= 42 ) # 学習データの保存 train_filename = "train.csv" train_full_df.to_csv(train_filename, index= False ) # 推論データの保存 test_filename = "test.csv" test_df.to_csv(test_filename, index= False ) # GCS へアップロード (フォルダ /data 配下へ) !gsutil cp {train_filename} {WORK_DIR}/data/{train_filename} !gsutil cp {test_filename} {WORK_DIR}/data/{test_filename} print (f "学習データ: {WORK_DIR}/data/{train_filename}" ) print (f "推論データ: {WORK_DIR}/data/{test_filename}" ) 学習スクリプトの作成 Vertex AI Custom Training として実行する Python スクリプトを作成します。今回は機械学習アルゴリズムに XGBoost を採用します。 環境変数 AIP_MODEL_DIR で与えられた Cloud Storage に作成したモデルを保存すると、自動的に Vertex AI Model Registry にアップロードされます。 %%writefile task.py import argparse import pandas as pd import xgboost as xgb from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error import os from google.cloud import storage from urllib.parse import urlparse # 引数の受け取り parser = argparse.ArgumentParser() parser.add_argument( '--train-data-uri' , dest= 'train_data_uri' , type = str , required= True ) args = parser.parse_args() # GCS から学習用データセットをダウンロード parsed_url = urlparse(args.train_data_uri) bucket_name = parsed_url.netloc blob_path = parsed_url.path.lstrip( "/" ) local_filename = "downloaded_train.csv" client = storage.Client() bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) print (f "Downloading training data from: {args.train_data_uri}" , flush= True ) blob.download_to_filename(local_filename) # 学習用データセットを読み込む df = pd.read_csv(args.train_data_uri) # 特徴量とターゲットに分離し、Numpy Array に変換 target_col = "MedHouseVal" X = df.drop(columns=[target_col]).values y = df[target_col].values # トレーニングに必要な学習データと評価データに分割する X_train, X_val, y_train, y_val = train_test_split(X, y, test_size= 0.1 , random_state= 42 ) # XGBoost モデルの構築と学習 model = xgb.XGBRegressor( n_estimators= 100 , learning_rate= 0.1 , max_depth= 5 , random_state= 42 , objective= 'reg:squarederror' ) model.fit( X_train, y_train, eval_set=[(X_val, y_val)], verbose= True ) # 評価スコアを Cloud Logging に出力する score = model.score(X_val, y_val) mse = mean_squared_error(y_val, model.predict(X_val)) print (f "Validation Score (R^2): {score:.4f}" , flush= True ) print (f "Validation MSE: {mse:.4f}" , flush= True ) # モデルの保存 model_filename = "model.bst" model.save_model(model_filename) aip_model_dir = os.getenv( "AIP_MODEL_DIR" ) if aip_model_dir: print (f "Uploading model to: {aip_model_dir}" , flush= True ) parsed_url = urlparse(aip_model_dir) bucket_name = parsed_url.netloc blob_path = parsed_url.path.lstrip( "/" ).rstrip( "/" ) + "/" + model_filename # Cloud Storage にアップロード bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) blob.upload_from_filename(model_filename) print ( "Model upload completed." , flush= True ) 学習ジョブの実行 作成したスクリプトをコンテナで実行します。今回は Google Cloud が 提供するビルド済みコンテナを使用するため、Dockerfile の作成は不要です。 また、軽量なモデルのため n1-standard-4 マシンタイプを使用しますが、巨大なモデルをトレーニングしたい場合は GPU を使用することで高速に処理できます。 # ジョブの定義 job = aiplatform.CustomTrainingJob( display_name= "california-housing-xgb-job" , script_path= "task.py" , container_uri= "asia-docker.pkg.dev/vertex-ai/training/xgboost-cpu.2-1:latest" , requirements=[ "pandas" , "google-cloud-storage" ], staging_bucket=WORK_DIR, model_serving_container_image_uri= "asia-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.2-1:latest" ) # ジョブの実行 print ( "ジョブを送信しました。完了まで5分ほどお待ちください..." ) model = job.run( machine_type= "n1-standard-4" , replica_count= 1 , model_display_name= "california-housing-xgb-model" , # GCS上の学習データパスを引数で渡す args=[f "--train-data-uri={WORK_DIR}/data/train.csv" ] ) ジョブ終了後に Cloud Storage にモデルが保存されていることを確認できます。 Google Cloud コンソールの モデルレジストリの画面でモデルが作成されていることを確認できます。 推論の実行 本番運用では Vertex AI に備わっているオンライン推論やバッチ推論を使用して大量のリクエストを捌きますが、ここでは Colab Enterprise の実行環境上で推論を実行します。 # --------------------------------------- import xgboost as xgb import pandas as pd import matplotlib.pyplot as plt # --- モデルの場所 --- model_gcs_uri = model.uri # GCSのパスを直接指定する場合 # model_gcs_uri = WORK_DIR + "/aiplatform-custom-training-xxxxx/model" print (f "参照モデル: {model_gcs_uri}" ) # 推論データのダウンロード !gsutil cp {model_gcs_uri}/model.bst . !gsutil cp {WORK_DIR}/data/test.csv . print ( "ダウンロード完了。推論を開始します..." ) # モデルの読み込み local_model = xgb.XGBRegressor() local_model.load_model( "model.bst" ) # データの読み込み df_test = pd.read_csv( "test.csv" ) # 推論用に「正解列」を削除したデータフレームを作ります target_col = "MedHouseVal" X_test = df_test.drop(columns=[target_col]) # 推論の実行 # .values をつけて数値行列として渡します predictions = local_model.predict(X_test.values) # --- 結果の表示 --- print ( " \n === 推論結果 (最初の10件) ===" ) for i, pred in enumerate (predictions[: 10 ]): print (f "データ{i}: 予測価格 {pred:.4f}" ) 推論結果の確認 最後に予測結果と正解データを照合して評価指標を計算し散布図を描画します。 from sklearn.metrics import r2_score, mean_squared_error import numpy as np # 正解データの抽出 y_true = df_test[ "MedHouseVal" ] # 精度評価 (答え合わせ) r2 = r2_score(y_true, predictions) rmse = np.sqrt(mean_squared_error(y_true, predictions)) print ( "-" * 30 ) print (f "評価結果 (データ数: {len(y_true)}件)" ) print (f "R2 Score (決定係数): {r2:.4f}" ) print (f "RMSE (誤差の大きさ): {rmse:.4f}" ) print ( "-" * 30 ) # 結果の可視化 (散布図) plt.figure(figsize=( 8 , 8 )) plt.scatter(y_true, predictions, alpha= 0.5 , color= 'blue' , label= 'Predictions' ) # 理想線 (y=x) min_val = min (y_true.min(), predictions.min()) max_val = max (y_true.max(), predictions.max()) plt.plot([min_val, max_val], [min_val, max_val], 'r--' , label= 'Ideal Fit' ) plt.title(f "Actual vs Predicted (R2: {r2:.3f})" ) plt.xlabel( "Actual Price" ) plt.ylabel( "Predicted Price" ) plt.legend() plt.grid( True ) plt.show() 片岩 裕貴 (記事一覧) クラウドソリューション部 クラウドディベロッパー課 和歌山県在住のエンジニア。興味分野はAI/ML。Google Cloud Partner Top Engineer に選出(2025 / 2026)。
.table-of-contents > li > ul > li > ul { display: none } はじめに こんにちは、データシステム部MLOpsブロックの 木村 です。MLOpsブロックではZOZOTOWN、WEAR by ZOZOをはじめとして、弊社で提供するさまざまなサービスに関わるML機能を開発・運用しています。 本記事で紹介する ZOZOマッチ (以下、本アプリ)は「ファッションで恋する」をコンセプトとしたマッチングアプリです。本アプリもML機能を持ち、MLOpsブロックが機能を開発・運用しています。 本アプリの特徴的なML機能として ファッションジャンル診断 (以下、ジャンル診断)があります。ジャンル診断とは全身画像をストリートやモードなどZOZOが定義した12種類のファッションジャンルに分類し、該当するジャンルの割合を「ジャンル傾向」として円グラフで表示する機能です。 ジャンル診断には2つの利用方法があります。1つ目はプロフィールに登録した自分の全身画像から自分の「ジャンル傾向」を判定する方法です。2つ目は好みのコーディネート画像を複数枚選んで自分の「好みの雰囲気のジャンル傾向」を判定する方法です。これら2つの利用方法によって「ファッションで恋する」レコメンドを実現しています。 このジャンル診断を実現するために、全身画像からファッションジャンルを判定する「ジャンル診断API」を開発しました。ジャンル診断APIはMLモデルによる推論処理を行い、全身画像を入力として12種類のジャンルそれぞれに属する確率を返します。 今回、このジャンル診断APIの推論サーバーとして Triton Inference Server (以下、Triton)を導入しました。TritonはNVIDIAが提供するオープンソースの推論サーバーです。 本記事ではジャンル診断APIの開発で推論サーバーにTritonを採用した背景から、導入時に遭遇した課題とその解決策、導入効果までを説明します。Tritonの導入にあたり 公式ドキュメント の説明だけでは理解しにくかった点もあり、いくつかの課題に直面しました。これらの知見がTritonを使った推論サーバーの構築・運用を検討する際の参考になれば幸いです。 本記事の内容は2026年2月時点の情報であることにご留意ください。 目次 はじめに 目次 Tritonを導入した背景 従来のML推論API構成とその課題 推論サーバーの選定 Tritonを採用するメリット 推論処理の高速化 処理の分離による保守性向上 設定変更の容易さ 推論単体の性能検証 ジャンル診断APIへの導入経緯 システム構成 全体構成 リクエストの流れ 通信プロトコルの選定 Tritonがサポートするプロトコル HTTPとgRPCの比較 Kubernetesリソースの構成 Tritonコンテナの起動設定 Deploymentマニフェスト InitコンテナとVolume Mountによるモデル共有 OpenTelemetryによるトレース設定 ヘルスチェック設定 セキュリティ設定 Triton Podへの接続設定 GCSの認証設定 オートスケーリングの設定 モデルのディレクトリ構成 モデルの設定 推論ゲートウェイAPIの実装 Triton接続の設定 StyleFeatureExtractorクラスの実装 推論リクエスト処理の実装 導入時の課題と解決策 課題1:gRPCで負荷分散されない 原因 解決策 1. Headless Serviceの採用 2. クライアント側でround_robinロードバランシングを設定 スケールアウト時の負荷分散 課題2:Datadog APMでトレースが連携されない 原因 解決策 1. W3C Trace Contextによるトレース伝播 2. Triton専用NodeにDatadog AgentをDaemonSetで配置 3. Tritonの起動オプションでトレース設定 課題3:PyTorchモデルがロードされない 原因 解決策 導入効果 導入時の注意点 コスト面の考慮 まとめ Tritonを導入した背景 従来のML推論API構成とその課題 MLOpsブロックでは従来、前処理から推論、後処理までの全処理を単一のAPI内で実行していました。Pod起動時に Google Cloud Storage (以下、GCS)からMLモデルをダウンロードし、入力データの取得から後処理までを同一プロセス内で実行する構成です。 この構成には2つの課題がありました。 1つ目はリソース効率が悪いことです。画像ダウンロードのようなI/Oバウンドな処理とMLモデルでの推論のようなCPU/GPUバウンドな処理ではボトルネックが異なります。しかし単一のAPIではこれらを分離できません。このリソース効率の悪さはパフォーマンス面とコスト面の両方に影響がありました。パフォーマンス面では、どちらか一方がボトルネックでもAPI全体を水平・垂直スケールする必要があり、ボトルネックではない処理のリソースまで増やすことになっていました。コスト面では、推論にGPUを使用したい場合、すべてのAPI PodにGPUを割り当てる必要があります。ただし実際にGPUを使うのは推論処理の間だけです。前処理・後処理の間もGPUを占有し続けるためGPU使用率が低くなり、無駄なコストが発生していました。 2つ目は推論速度の向上が難しいことです。ML推論APIではモデルの推論処理に要する時間が全体のレイテンシの大部分を占めます。しかし推論処理は単一のAPI内に組み込まれていることで、汎用的なWebフレームワーク上で実行されるため推論に特化した最適化手段が限られていました。そのため推論のレイテンシ改善が難しく、リアルタイム推論を活用したUXの良い機能の提供が困難でした。 推論サーバーの選定 これらの課題を解決するため推論サーバーの導入を検討しました。主要な推論サーバーを比較した結果は次の通りです。 名称 対応フレームワーク 実装言語 TorchServe PyTorch専用 Java/Python TensorFlow Serving TensorFlow専用 C++ BentoML 複数対応 Python Triton Inference Server 複数対応 C++ MLOpsブロックではPyTorchやTensorFlowなど複数のMLフレームワークを利用しています。また今回のジャンル診断だけでなく今後の別プロジェクトでも活用できる共通基盤として、複数フレームワークに対応しつつ高速な推論基盤が必要でした。 TorchServe と TensorFlow Serving はどちらも特定フレームワークのモデルで手軽にAPIを作成したい場合に有効です。しかし、複数フレームワークに対応していません。 BentoML は様々なMLフレームワークに対応していますが、Python実装のためパフォーマンス面で不利な場合があります。 一方でTritonは様々なMLフレームワークに対応しておりC++実装のため高速です。構成が複雑でキャッチアップコストはありますが、汎用性・機能性が最も高く共通基盤として最適と判断しTritonを選定しました。 Tritonを採用するメリット 複数の推論バックエンドに対応していることに加えてTritonを導入するメリットは次の4つがあります。 推論処理の高速化 処理の分離による保守性向上 設定変更の容易さ 推論単体の性能検証 以降で各メリットについて詳しく説明します。 推論処理の高速化 FastAPIなどPythonベースの推論サーバーは汎用的なWebフレームワークであり推論処理に最適化されていません。一方Tritonは推論処理に特化したサーバーです。リクエストの受付と推論実行を別々のスレッドプールで処理するため、推論中も新しいリクエストを受け付けられます。またC++で実装されているためPythonのGlobal Interpreter Lockの制約を受けず、推論処理をマルチスレッドで並列実行できます。 また Dynamic Batching により複数の推論リクエストをサーバー側でバッチ処理できます。最初のリクエストを受け取ってから指定した待機時間内に届いたリクエストを動的にまとめることで、個別処理よりもスループットが向上します。 さらに instance_group により同一モデルの複数インスタンスを並列実行できます。デフォルトでは同時に1つのリクエストのみ実行されますが、インスタンス数を増やすことで複数のリクエストを並列処理できます。 処理の分離による保守性向上 推論処理をTritonに委譲することでAPIサーバーは前処理・後処理に、Tritonは推論に専念できます。この分離により各コンポーネントの責務が明確になりコードの保守性が向上します。 スケーリングの観点でも大きなメリットがあります。具体的には水平スケール(Pod数の増減)や垂直スケール(CPUやメモリの割り当て変更)をAPIサーバーとTritonそれぞれに対して個別に適用できるようになります。推論がボトルネックの場合はTriton Podのみを、前処理・後処理がボトルネックの場合はAPIサーバー Podのみをそれぞれ独立してスケールできます。従来の一体型構成では推論のボトルネック解消のためにPod全体をスケールする必要がありました。分離構成ではボトルネックとなっているコンポーネントのみをスケールすればよくリソース効率が向上します。またAPIサーバーとTritonのメトリクスを個別に監視することでどちらがボトルネックになっているかを把握しやすくなります。 設定変更の容易さ Tritonではモデルの設定を config.pbtxt で宣言的に管理します。バッチサイズの上限、モデルインスタンス数、Dynamic Batchingの待機時間などをコード修正なしで変更できます。 従来のPythonベースの実装でもバッチサイズ等を環境変数で外部化できます。一方 config.pbtxt では max_batch_size や dynamic_batching 、 instance_group など推論サーバーに必要な設定項目が標準化されています。これらの設定項目を自前で設計する必要がありません。また config.pbtxt とモデルファイルを含むディレクトリをそのまま別環境にコピーすれば動作するため、環境間での設定の共有も容易になります。 推論単体の性能検証 従来の構成では推論性能の測定にはAPI全体への検証が必要であり、前処理・後処理の影響を含んだ結果しか得られませんでした。 Triton公式の Perf Analyzer を使用することで推論部分のみの性能を切り出して測定できます。Perf AnalyzerはTritonで実行される機械学習モデルの推論パフォーマンスを測定・最適化するためのCLIツールです。汎用的な負荷試験ツールでTritonをテストする場合はテンソル形式でのリクエスト組み立てを自前で実装する必要があります。一方Perf Analyzerではモデルのメタデータを自動取得してリクエストを生成するためその実装が不要です。さらにサーバー内部のレイテンシをqueueとcomputeに分解して出力します。queueは推論スケジューラのキュー内でリクエストが待機していた時間を示し、computeは入力データの準備、実際の推論実行、出力データの取得にかかった時間を示します。この分解によりボトルネックの切り分けが容易になり、モデル更新時のパフォーマンス確認や推論パラメータの調整に役立ちます。 ジャンル診断APIへの導入経緯 ここではなぜジャンル診断APIにTritonを導入したのかを説明します。 ジャンル診断APIは「ファッションで恋する」を実現するZOZOマッチの根幹となる機能です。このAPIには低レイテンシとスケーラビリティの2つの要件がありました。 オンボーディング時に使用されるAPIのため遅延があるとユーザー離脱につながります。またプロフィール画像を変更するたびに診断が実行されるためユーザー増加に伴いスケーラビリティも必要です。前述の通りTritonはC++実装やDynamic Batchingにより低レイテンシと高スループットを実現でき、これらの要件に適していると判断しました。 さらにTritonは社内での導入実績がありませんでした。既存の本番サービス(ZOZOTOWN、WEAR等)は利用者数が多く、問題発生時の影響範囲が大きくなります。一方ZOZOマッチはサービス立ち上げ段階であり、ZOZOTOWNやWEARなどの既存サービスと比較するとユーザー数が少ないです。また新規構築のため既存システムとの依存関係も少なく、新技術の検証に適していると判断しました。 以降ではジャンル診断APIのシステム構成とTritonを利用した推論API開発時に遭遇した課題について説明します。 システム構成 全体構成 本システムではAPIサーバーとTritonを分離する構成を採用しました。前述の「 処理の分離による保守性向上 」で説明した通り、この構成により責務の明確化とボトルネックに応じた柔軟なスケーリングが可能になります。 リクエストの流れ 次の図にリクエストの流れを示します。 ユーザーが自分の全身画像をプロフィール画像として登録するとBackend Serverがその画像データをAPIサーバーに送信します。APIサーバーはリサイズと正規化の前処理を行いgRPCプロトコルでTritonに推論リクエストを送信します。Tritonは12種類のファッションジャンルそれぞれに対して0〜1のスコアを返します。スコアが高いほどそのジャンルに該当する可能性が高いことを示します。以下はレスポンス例です。 { " genres ": [ { " genre_id ": 1 , " score ": 0.045 } , { " genre_id ": 2 , " score ": 0.696 } , ... { " genre_id ": 12 , " score ": 0.333 } ] } APIサーバーがこの診断結果をBackend Serverに返し、最終的にユーザーへレスポンスが返却されます。なおオンボーディング時に表示される選択肢画像についてはバッチ処理で事前にスコアを算出済みのためリアルタイムでの推論は不要です。リアルタイム推論が必要なのはユーザーがプロフィール画像を登録・変更した場合のみです。 通信プロトコルの選定 上記の通りAPIサーバーとTriton間の通信にはgRPCを採用しています。ここではTritonがサポートするプロトコルとgRPCを選定した理由を説明します。 Tritonがサポートするプロトコル TritonはHTTPとgRPCの2種類のプロトコルで推論リクエストを受け付けます。HTTPはポート8000、gRPCはポート8001で公開されます。どちらのプロトコルも同じ推論機能を提供しますがパフォーマンス特性が異なります。 HTTPとgRPCの比較 TritonのHTTPエンドポイントはHTTP/1.1ベースで、curlや各種HTTPライブラリで簡単にリクエストでき、ブラウザからも直接アクセスできるため広く利用されています。一方gRPCはHTTP/2ベースでサーバー間の内部通信に適していますが、ブラウザAPIでサポートされていないためブラウザから直接呼び出すことができません。しかし本システムではAPIサーバーからTritonへのサーバー間通信のみを対象としておりブラウザからのアクセスは不要なため、この制約は問題になりません。 gRPCがサーバー間通信に適している理由はHTTP/2による多重化です。HTTP/1.1では1つのTCP接続で同時に1つのリクエストしか処理できないため、高頻度のリクエストでは接続のオーバーヘッドが課題となります。HTTP/2では単一の接続上で複数のリクエストを同時に処理できます。本システムではAPIサーバーからTritonへ頻繁にリクエストを送信するためこの多重化の特性が有効です。 一方でgRPCを採用する際の注意点があります。HTTP/2の長時間接続がKubernetesの負荷分散と相性が悪いという問題です。この課題と解決策については後述の「 課題1:gRPCで負荷分散されない 」で説明します。 Kubernetesリソースの構成 MLOpsブロックではパブリッククラウドにGoogle Cloudを使用し Google Kubernetes Engine (以下、GKE)上にサービスを構築しています。ここではTritonをGKE上でデプロイするために作成したKubernetesリソースを解説します。TritonをGKE上で動かすために以下のリソースを作成しました。 Deployment :Triton Podの稼働 Service :Triton Podへのネットワークアクセスの提供 ServiceAccount :GCSアクセスのための認証設定 ScaledObject :KEDAによるオートスケーリングの設定 Tritonコンテナの起動設定 TritonをデプロイするためのDeployment構成を解説します。まず全体のマニフェストを示し、その後に各設定の詳細を説明します。 Deploymentマニフェスト apiVersion : apps/v1 kind : Deployment metadata : name : triton-inference-server labels : app : triton-inference-server spec : selector : matchLabels : app : triton-inference-server template : metadata : labels : app : triton-inference-server spec : serviceAccountName : triton-inference-server affinity : nodeAffinity : requiredDuringSchedulingIgnoredDuringExecution : nodeSelectorTerms : - matchExpressions : - key : cloud.google.com/gke-nodepool operator : In values : - "<triton-nodepool-name>" tolerations : - key : "dedicated" operator : "Equal" value : "<triton-nodepool-name>" effect : "NoSchedule" initContainers : - name : gcloud image : gcr.io/google.com/cloudsdktool/cloud-sdk:418.0.0 env : - name : MODEL_REPOSITORY_PATH value : gs://your-bucket/model_repository - name : MODEL_DIR value : "/tmp/models/" volumeMounts : - name : models mountPath : "/tmp/models/" command : - "/bin/sh" - "-c" - "gsutil cp -r $(MODEL_REPOSITORY_PATH)/* $(MODEL_DIR)" containers : - name : triton-inference-server image : "nvcr.io/nvidia/tritonserver:25.03-py3" imagePullPolicy : IfNotPresent resources : limits : cpu : 3000m memory : 3Gi command : [ "tritonserver" ] args : - "--model-store=/tmp/models/" - "--trace-config" - "mode=opentelemetry" - "--trace-config" - "opentelemetry,url=http://$(DD_AGENT_HOST):4318/v1/traces" - "--trace-config" - "rate=1" - "--trace-config" - "level=TIMESTAMPS" - "--grpc-max-connection-age=30000" - "--grpc-max-connection-age-grace=10000" env : - name : DD_AGENT_HOST valueFrom : fieldRef : fieldPath : status.hostIP volumeMounts : - name : models mountPath : "/tmp/models/" ports : - containerPort : 8000 name : http - containerPort : 8001 name : grpc livenessProbe : initialDelaySeconds : 10 periodSeconds : 5 failureThreshold : 6 httpGet : path : /v2/health/live port : http readinessProbe : initialDelaySeconds : 10 periodSeconds : 5 failureThreshold : 6 httpGet : path : /v2/health/ready port : http volumes : - name : models emptyDir : {} securityContext : runAsUser : 1000 fsGroup : 1000 以下、マニフェストの各設定について詳しく解説します。 InitコンテナとVolume Mountによるモデル共有 本システムでは Initコンテナ でGCSからMLモデルをダウンロードしています。InitコンテナはPod内でアプリケーションコンテナの前に実行されるコンテナで、マニフェストの initContainers フィールドで定義します。ダウンロード先のディレクトリをTritonコンテナでもVolume MountすることでダウンロードしたMLモデルをTritonから参照できる仕組みです。 initContainers : - name : gcloud image : gcr.io/google.com/cloudsdktool/cloud-sdk:418.0.0 env : - name : MODEL_REPOSITORY_PATH value : gs://your-bucket/model_repository - name : MODEL_DIR value : "/tmp/models/" volumeMounts : - name : models mountPath : "/tmp/models/" command : - "/bin/sh" - "-c" - "gsutil cp -r $(MODEL_REPOSITORY_PATH)/* $(MODEL_DIR)" containers : - name : triton-inference-server volumeMounts : - name : models mountPath : "/tmp/models/" volumes : - name : models emptyDir : {} emptyDir はPod起動時に作成される一時ボリュームで同一Pod内のコンテナ間でデータを共有できます。 Triton Pod起動時の処理の流れは次の通りです。 Initコンテナが起動し、GCSからモデルリポジトリ全体を /tmp/models/ にダウンロード Initコンテナが正常終了 アプリケーションコンテナ(Triton)が起動し、同じ /tmp/models/ からモデルをロード この構成によりモデルファイルをコンテナイメージに含める必要がなく、モデル更新時もイメージの再ビルドが不要になります。 OpenTelemetryによるトレース設定 本システムではDatadog APMでモニタリングを行っています。Tritonは OpenTelemetry (以下、OTel)形式でのトレース出力に対応しています。そのためTritonコンテナ起動時に以下の引数を指定することでDatadog APMと連携できます。 args : - "--trace-config" - "mode=opentelemetry" - "--trace-config" - "opentelemetry,url=http://$(DD_AGENT_HOST):4318/v1/traces" - "--trace-config" - "rate=1" - "--trace-config" - "level=TIMESTAMPS" env : - name : DD_AGENT_HOST valueFrom : fieldRef : fieldPath : status.hostIP 各パラメータの意味は次の通りです。 パラメータ 設定値 説明 mode opentelemetry OTelのAPIを使用してトレースを出力 opentelemetry,url http://$(DD_AGENT_HOST):4318/v1/traces トレースデータの送信先 rate 1 N番目のリクエストごとにトレース。 1 は全リクエスト、 1000 なら1000件に1件(デフォルト: 1000) level TIMESTAMPS 各リクエストの実行タイムスタンプを記録 DD_AGENT_HOST には status.hostIP (NodeのIP)を指定しています。Datadog Agentの推奨構成ではUNIXドメインソケットを使用します。しかしTriton(25.03時点)はOTLP Exporter経由のトレース送信においてUNIXドメインソケットに対応しておらず、HTTPプロトコルのみに対応しています。そのためホストIPとポートを指定する構成としました。 ヘルスチェック設定 livenessProbe : httpGet : path : /v2/health/live port : http readinessProbe : httpGet : path : /v2/health/ready port : http Tritonがサポートするヘルスチェックエンドポイントを livenessProbe、readinessProbe に設定しています。 /v2/health/live はTritonがリクエストを受け取れる状態かを確認し /v2/health/ready はすべてのモデルが正常にロードされたかを確認します。 セキュリティ設定 securityContext : runAsUser : 1000 fsGroup : 1000 セキュリティの観点からコンテナは非rootユーザーで実行しています。 runAsUser でコンテナ内のプロセスを実行するユーザーIDを指定しrootでの実行を避けています。 fsGroup でマウントされたボリュームのグループ所有権を同じIDに設定することで非rootユーザーでもモデルファイルへアクセスできるようにしています。 Triton Podへの接続設定 APIサーバーからTritonに接続するためのServiceを作成します。 KubernetesのServiceは複数のPodへのアクセスを抽象化するリソースです。通常のClusterIP ServiceではServiceに仮想IP(Cluster IP)が割り当てられ、クライアントはこの単一のIPアドレスにアクセスします。コンテナからの通信は kube-proxy によりDNATされ背後のPodにロードバランシングされます。 一方 Headless Service は clusterIP: None を指定したServiceであり仮想IPが割り当てられません。DNS名前解決ではServiceに紐づくすべてのPodのIPアドレスが直接返されます。これによりクライアント側で接続先のPodを選択できます。 本システムではHeadless Serviceを使用しています。通常のClusterIP Serviceを使用しない理由はgRPCの負荷分散に関係しています。詳細は後述の「 課題1:gRPCで負荷分散されない 」で説明します。 以下がServiceのマニフェストです。 apiVersion : v1 kind : Service metadata : name : triton-inference-server labels : app : triton-inference-server spec : type : ClusterIP clusterIP : None ports : - port : 8000 targetPort : http name : http-inference-server - port : 8001 targetPort : grpc name : grpc-inference-server selector : app : triton-inference-server 前述の通り8000番ポート(HTTP)はヘルスチェック用、8001番ポート(gRPC)は推論リクエスト用に公開しています。 GCSの認証設定 InitコンテナがGCSからモデルをダウンロードするにはGCSへのアクセス権限が必要です。GKEでは Workload Identity を使用することでKubernetesのServiceAccountとGCPのサービスアカウントを紐付けられます。これによりPod内からサービスアカウントキーを管理することなくGCPリソースにアクセスできます。 apiVersion : v1 kind : ServiceAccount metadata : name : triton-inference-server annotations : iam.gke.io/gcp-service-account : <gcp-service-account>@<project-id>.iam.gserviceaccount.com オートスケーリングの設定 KEDAを使用してCPU使用率に基づくオートスケーリングを設定しています。KEDAはKubernetes Event-driven Autoscalingの略で様々なメトリクスに基づいてPod数を自動調整できます。 KEDAについては下記の記事でも触れられていますのでぜひこちらもご参照ください。 techblog.zozo.com apiVersion : keda.sh/v1alpha1 kind : ScaledObject metadata : name : triton-inference-server spec : scaleTargetRef : name : triton-inference-server maxReplicaCount : 5 minReplicaCount : 2 cooldownPeriod : 300 triggers : - type : cpu metricType : Utilization metadata : value : "50" この設定ではCPU使用率50%を超えるとスケールアウトし、最小2台〜最大5台の範囲でPod数を調整します。最小2台としているのは可用性を担保するためです。 モデルのディレクトリ構成 Tritonでモデルを管理するには所定のディレクトリ構成に従う必要があります。本システムではその制約に従い、GCS上のモデルリポジトリを次のような構成にしました。 model_repository/ └── genre_extract_torchscript/ # モデル名(config.pbtxtのnameを指定する場合は一致させる) ├── config.pbtxt # モデルの入出力形式やバッチサイズなどを定義 ├── 1/ # モデルのバージョン │ └── model.pt └── 2/ # モデルのバージョン └── model.pt ディレクトリ名がそのままモデル名として使われます。今回は genre_extract_torchscript です。config.pbtxtで name フィールドを指定する場合はディレクトリ名と一致させる必要があります。 1/ や 2/ はバージョンを示すディレクトリでバージョン番号をディレクトリ名として使用します。この構成により複数バージョンのモデルを同時に管理でき、バージョン切り替えも容易になります。 実際にロードするバージョンはconfig.pbtxtのversion policyで制御でき、デフォルトでは最新バージョンのみがロードされます。config.pbtxtの詳細は「 モデルの設定 」で説明します。 モデルの設定 Tritonではモデルごとに config.pbtxt を用意します。config.pbtxtではモデルの入出力形式やバッチサイズなどを定義します。ジャンル診断モデルで使用しているconfig.pbtxtを以下に示します。 name: "genre_extract_torchscript" platform: "pytorch_libtorch" max_batch_size: 1 input [ { name: "image_input" data_type: TYPE_FP32 format: FORMAT_NCHW dims: [ 3, 224, 224 ] } ] output [ { name: "genre_scores" data_type: TYPE_FP32 dims: [ 12 ] } ] この設定では224×224ピクセルのRGB画像を入力として受け取り、12次元のベクトルを出力します。本システムでは12種類のファッションジャンルに対するスコアに対応しています。 platform フィールドの pytorch_libtorch はTorchScript形式のPyTorchモデルを使用することを示しています。 input[].format フィールドのNCHW形式とは画像データの配列順序を表しており、N(バッチサイズ)、C(チャンネル数=3)、H(高さ=224)、W(幅=224)の順にデータが並びます。 前述の「 推論処理の高速化 」で説明したDynamic Batchingを有効にするには以下のように設定します。 dynamic_batching { max_queue_delay_microseconds: 100 } max_queue_delay_microseconds はバッチを形成するためにリクエストを待機する最大時間をマイクロ秒で指定します。この待機時間内に届いたリクエストをまとめて1回の推論で処理します。 本システムでは max_batch_size: 1 としてDynamic Batchingを無効にしています。リリース直後でリクエスト数が少なくバッチを形成するための待機時間がレイテンシ悪化につながるためです。リクエスト数が増加した場合は max_batch_size を増やし dynamic_batching を有効にすることでスループットを向上できます。 GPUで推論する場合は instance_group で kind: KIND_GPU を指定します。 instance_group [ { kind: KIND_GPU count: 2 } ] count はGPUごとに起動するモデルインスタンス数を指定します。本システムではCPUで推論しています。リリース直後はリクエスト数が少なくCPUでも十分な処理性能を確保できるためです。GPUはCPUと比べてコストが高いため、リクエスト数の増加によりCPUでの処理が追いつかなくなった段階でGPUへの移行を検討します。 推論ゲートウェイAPIの実装 推論ゲートウェイAPIはBackend Serverからのリクエストを受け取り推論結果を返すAPIです。本システムでは tritonclient ライブラリのgRPCクライアントを使用しています。 Triton接続の設定 tritonclientで推論リクエストを送信する際の設定項目は以下の通りです。 設定項目 必須/任意 説明 接続先URL 必須 Tritonサーバーのホストとポート。gRPCのデフォルトポートは8001 モデル名 必須 config.pbtxtの name フィールドと一致させる モデルバージョン 任意 使用するモデルのバージョン番号。省略時は最新バージョンを使用 タイムアウト 任意 リクエストのタイムアウト時間 gRPCチャンネル設定 任意 gRPCチャンネルに渡す引数 本システムではこれらの設定を以下のように定義しています。 # 接続先URL: KubernetesのHeadless Service名とgRPCポートを指定 triton_url = "triton-inference-server.match-genre-extract.svc.cluster.local:8001" # モデル名: config.pbtxtのnameフィールドと一致させる model_name = "genre_extract_torchscript" # モデルバージョン model_version = "1" # タイムアウト timeout_ms = 1000 # gRPCチャンネル設定: ロードバランシングポリシーをround_robinに設定 channel_args = [( "grpc.lb_policy_name" , "round_robin" )] channel_args でgRPCクライアントのロードバランシングポリシーを設定できます。値の指定によりTritonサーバーへの負荷分散の方法を指定できます。この設定の詳細は後述の「 課題1:gRPCで負荷分散されない 」で説明します。 StyleFeatureExtractorクラスの実装 Tritonへの推論リクエストを行うクラスを実装します。なおコード例では入出力名やデータ型といったモデル依存の設定値をハードコーディングしていますが、実際の運用では環境変数から取得しています。 import numpy as np import tritonclient.grpc as grpcclient from opentelemetry.propagate import inject from tritonclient.grpc import InferResult from api.env_settings import settings class StyleFeatureExtractor : _INPUT_NAME = "image_input" _OUTPUT_NAME = "genre_scores" _DATA_TYPE_FP32 = "FP32" def __init__ (self): self.triton_client = grpcclient.InferenceServerClient( url=settings.triton_url, channel_args=settings.channel_args ) def predict_score_and_vector (self, image_data: np.ndarray) -> list [MLGenreScore]: trace_headers = {} inject(trace_headers) inputs = [ grpcclient.InferInput( self._INPUT_NAME, image_data.shape, self._DATA_TYPE_FP32 ) ] inputs[ 0 ].set_data_from_numpy(image_data) outputs = [grpcclient.InferRequestedOutput(self._OUTPUT_NAME)] response: InferResult = self.triton_client.infer( model_name=settings.model_name, inputs=inputs, outputs=outputs, model_version=settings.triton_model_version, headers=trace_headers, timeout=settings.triton_timeout_ms, ) scores = response.as_numpy(self._OUTPUT_NAME) _INPUT_NAME と _OUTPUT_NAME は前述の config.pbtxt で定義した入出力名と一致させます。 __init__ では前述の設定値を使って InferenceServerClient を初期化します。 predict_score_and_vector メソッドではW3C Trace Contextヘッダーを準備しています。これはDatadog APMとの連携に使用するもので、詳細は「 課題2-Datadog APMでトレースが連携されない 」で説明します。 推論リクエスト処理の実装 前述の StyleFeatureExtractor を使用して推論します。まず前処理としてダウンロードした画像をモデルの入力形式に変換します。tritonclientは入力データとしてnumpy配列を受け取るため、PyTorchテンソルから変換して渡します。 # 前処理 PREDICT_IMAGE_TRANSFORM = transforms.Compose([ transforms.Resize(( 224 , 224 )), transforms.ToTensor(), transforms.Normalize(mean=[ 0.485 , 0.456 , 0.406 ], std=[ 0.229 , 0.224 , 0.225 ]), ]) img_tensor = PREDICT_IMAGE_TRANSFORM(img) image_data = img_tensor.unsqueeze( 0 ).numpy() # 推論リクエスト ml_genre_scores = STYLE_FEATURE_EXTRACTOR.predict_score_and_vector(image_data) # 後処理... transforms.Resize で224×224ピクセルにリサイズし、 transforms.ToTensor でPyTorchテンソルに変換します。 transforms.Normalize ではImageNetデータセットの平均と標準偏差で正規化しています。最後にtritonclient用として unsqueeze(0) でバッチ次元を追加し numpy() でnumpy配列へ変換します。 推論リクエストでは前述の StyleFeatureExtractor の predict_score_and_vector メソッドを呼び出してTritonに送信します。 後処理では、Tritonから返却された12種類のファッションジャンルに対するスコアを、サービス要件に合わせたジャンルIDへマッピングして診断結果を生成します。詳細は本記事のスコープ外のため省略します。 以上が推論ゲートウェイAPIの実装です。 導入時の課題と解決策 ここまでTritonの導入方法を説明しましたが、導入にあたりいくつかの課題に直面しました。本節ではそれらの課題とその解決策を説明します。 課題1:gRPCで負荷分散されない Tritonサーバーでデプロイした推論APIの性能を測るために実施した負荷試験においてTriton Podを1台から複数台構成にしても処理できるリクエスト数がほとんど増加しませんでした。Pod数を増やして、適切に負荷分散がされれば、処理できるリクエスト数は増加します。 調査の結果、負荷が均等に分散されておらず、特定のPodのCPU使用率が100%近くに達する一方で他のPodはほとんど使用されていないことが判明しました。 原因 この問題はgRPCの特性およびKubernetesの負荷分散の仕組みに起因していました。 gRPCはHTTP/2上に構築されたL7プロトコルです。HTTP/1.1では1つのTCPコネクション上で同時に1つのリクエストしか処理できないため複数の並行リクエストには複数のTCPコネクションが必要です。一方HTTP/2では単一の長寿命なTCPコネクションを維持しその上で複数のリクエストを多重化します。 しかしこの特性はKubernetesの負荷分散との相性に問題があります。KubernetesにおいてService経由のトラフィックの負荷分散を担うのはkube-proxyです。kube-proxyはiptablesやIPVSを使用してTCPコネクション単位で負荷分散を行いコネクションが確立されるタイミングで振り分け先のPodを決定します。ただしkube-proxyはL4での負荷分散を行うため、L7プロトコルであるgRPCの個々のリクエストを認識できません。HTTP/2では単一のTCPコネクションが長時間維持されるためそのコネクション上のすべてのgRPCリクエストが最初の振り分け先Podへ集中してしまいます。 次の図に問題のある構成を示します。クライアントからのTCPコネクションが1つのPodに集中し他のPodはアイドル状態になっています。 解決策 gRPCの負荷分散の問題に対しては主に2つの解決策があります。 1つ目は Linkerd や Istio などのサービスメッシュの導入です。各Podにサイドカープロキシを注入してL7で負荷分散を行うためアプリケーションコードを変更せずにgRPC負荷分散を実現できます。一方で各Podにサイドカーが必要となりCPUやメモリを消費します。またコントロールプレーンの運用も必要になります。 2つ目はクライアントサイド負荷分散です。gRPCクライアントの設定でHeadless ServiceとDNSベースの負荷分散を利用します。サイドカーを経由しないため高いパフォーマンスを実現できます。一方でクライアント側での設定変更が必要であり負荷分散アルゴリズムはround_robinなど基本的なものに限定されます。 本システムではクライアントサイド負荷分散を採用しました。追加コンポーネントが不要で既存のKubernetesリソースとgRPCクライアントの設定変更のみで実現できるためです。また本システムではAPIサーバーからTritonへの内部通信のみが対象でありクライアント側の設定変更が可能です。推論処理の所要時間がほぼ一定であるためround_robinによる均等分散で十分な負荷分散の効果が得られると判断しました。具体的には次の2つの変更をしました。 1. Headless Serviceの採用 通常のClusterIP Serviceの代わりにHeadless Service( clusterIP: None )を使用しました。Headless ServiceはDNSクエリに対してPodのIPアドレスリストを直接返すためクライアント側で接続先を制御できます。 2. クライアント側でround_robinロードバランシングを設定 gRPCクライアントの設定で round_robin ロードバランシングポリシーを指定しました。これによりDNSで取得した複数のPod IPに対してリクエストを均等に分散できます。 tritonclient ライブラリでは InferenceServerClient の channel_args パラメータでgRPCチャンネルオプションを設定できます。 channel_args: list [ tuple [ str , str ]] = [( "grpc.lb_policy_name" , "round_robin" )] self.triton_client = grpcclient.InferenceServerClient( url=settings.triton_url, channel_args=settings.channel_args # round_robinを設定 ) grpc.lb_policy_name を round_robin に設定することでHeadless ServiceのDNSが返す複数のPod IPに対してリクエストごとに接続先を切り替えます。 次の図に解決後の構成を示します。クライアントがDNSから取得した複数のPod IPに対してround_robinでリクエストを分散しています。 スケールアウト時の負荷分散 上記のクライアントサイド負荷分散により起動時に存在するTriton Pod間での負荷分散は実現できました。しかし HPA などによるスケールアウトで新しいPodを追加した場合、そのPodへトラフィックを分散できない問題が残りました。 gRPCクライアントはすべてのSubchannelが切断された場合のみDNS名を再解決する仕様になっています。本システムではAPIサーバー起動時にTritonクライアントを生成しアプリケーション動作中は同じ接続を維持します。そのため既存の接続が維持されている限りDNS再解決は行われずスケールアウトで追加された新しいPodを検出できません。 この問題に対してはサーバー側で接続が存続できる最大時間を設定することで解決しました。gRPCは接続がクローズされるとDNS名を再解決するためサーバー側で定期的に接続を切断することでクライアントに再接続を促し、新しいPodを含めた負荷分散を実現できます。 具体的にはTritonの起動オプションで以下を設定しました。 --grpc-max-connection-age=30000 --grpc-max-connection-age-grace=10000 grpc-max-connection-age は接続が存在できる最大時間をミリ秒で指定するパラメータです。この時間を超えるとサーバーが接続を終了します。また grpc-max-connection-age-grace は接続終了後に処理中のRPCが完了するまでの猶予時間です。 30秒という値は新しいPodがReadyになってから最大30秒で負荷分散が開始されることを意味します。HPAによるスケールアウト自体に数分かかることを考えると30秒の遅延は許容範囲です。10秒という猶予時間は本システムの推論処理が非機能要件で1秒以内と定義されているため処理中のRPCを完了させるのに十分な時間です。 これらの変更によりPod数の増加に応じて線形にスループットが向上するようになりました。 課題2:Datadog APMでトレースが連携されない APIサーバーとTritonを分離した構成を導入したところDatadog APMでAPIサーバーとTritonのトレースを連携できない問題が発生しました。従来の一体型構成では1本のトレースとして追跡できていましたが、分離した構成では別々のトレースとして表示され同一リクエストの処理として紐付けられませんでした。障害調査時にエンドツーエンドでレイテンシを分析するためにはトレースの連携が必要でした。 原因 この問題はトレースコンテキストの伝播形式の違いに起因していました。 分散トレーシングではサービス間でトレース情報をHTTPヘッダーにより伝播し、複数のサービスを横断するリクエストを1本のトレースとして追跡します。しかしAPIサーバーとTritonでは伝播形式が異なっていました。 APIサーバー側:Datadog独自形式( x-datadog-* ヘッダー) Triton側: OpenTelemetry 形式のみサポート 各トレーシングツールで独自のヘッダー形式を使用していたため、サービス間でトレース情報が正しく伝播されず、トレースの分断が発生していました。 次の図は Monitor OpenTelemetry with Datadog and W3C Trace Context から引用した異なるトレーサーが混在する環境での問題を示しています。OTel TracerとDD Tracerがそれぞれ独自形式のTrace IDを使用するため各サービスのSpanがDatadogに送信されても同一トレースとして紐付けられません。図右側のAPM FlamegraphではDD Tracerを使用するSERVICE BのSpanのみが表示され、OTel Tracerを使用するSERVICE AとCは別トレース扱いになっています。本システムでも同様の問題が発生していました。 解決策 この問題を解決するために次の3つの対応をしました。 W3C Trace Contextによるトレース伝播 Triton専用NodeにDatadog AgentをDaemonSetで配置 Tritonの起動オプションでトレース設定 1. W3C Trace Contextによるトレース伝播 W3C Trace Contextは異なるトレーシングシステム間でトレース情報を共有するための標準規格です。HTTPヘッダー( traceparent 、 tracestate )を通じてトレースIDを伝播します。 APIサーバーからTritonへのリクエスト時にOTelの inject 関数を使用してトレース情報をヘッダーに埋め込みます。 from opentelemetry.propagate import inject def predict_score_and_vector (self, image_data: np.ndarray): # 現在のトレースコンテキストをW3C Trace Context形式でヘッダーに埋め込む trace_headers = {} inject(trace_headers) # Tritonへの推論リクエスト時にヘッダーを付与 response = self.triton_client.infer( model_name=self.model_name, inputs=inputs, outputs=outputs, headers=trace_headers, # W3C Trace Contextヘッダー timeout=settings.triton_timeout_ms, ) inject 関数はddtraceが管理する現在のスパンのトレースIDとスパンIDをW3C Trace Context形式のヘッダーに変換します。Triton側はこのヘッダーを読み取り同じトレースIDで新しいスパンを作成します。これによりddtraceで計装されたAPIサーバーとOTelで計装されたTritonの間でトレースが連携されます。本実装ではddtrace 2.21.8を使用しています 1 。 2. Triton専用NodeにDatadog AgentをDaemonSetで配置 TritonはOTel形式でトレースを出力するためOpenTelemetry Protocol(以下、OTLP)形式を受け取れるDatadog Agentが必要です。 KubernetesにおけるDaemonSetはクラスタ内の各NodeでPodが1つずつ稼働することを保証するリソースです。ログ収集やモニタリングエージェントなど各Nodeで実行が必要なシステム機能のデプロイに適しています。次の図にNode内でのトレース送信の流れを示します。 OTelで計装されたTritonからのトレースは同一Node上のDatadog AgentがOTLP形式で受け取りDatadog Backendに送信します。Tritonが稼働するNodeにDaemonSetでDatadog Agentを配置しport 4318でOTLPリクエストを受け取るよう設定しました。以下はその設定例です。 apiVersion : apps/v1 kind : DaemonSet metadata : name : triton-datadog-agent spec : template : spec : affinity : nodeAffinity : requiredDuringSchedulingIgnoredDuringExecution : nodeSelectorTerms : - matchExpressions : - key : cloud.google.com/gke-nodepool operator : In values : - <triton-nodepool-name> containers : - name : datadog-agent image : datadog/agent:7.56.1 env : - name : DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_HTTP_ENDPOINT value : "0.0.0.0:4318" ports : - containerPort : 4318 hostPort : 4318 name : traceporthttp protocol : TCP なお hostPort を使用するとNodeのポートが外部に公開されクラスタへのネットワーク侵入経路となる可能性があるため絶対に必要な場合を除き避けることが推奨されています。本記事の構成ではTritonが稼働するNode上でのみDatadog Agentと通信するために使用しておりプライベートなGKEクラスタ内での利用を想定しています。パブリックなクラスタで利用する場合はファイアウォールルールやネットワークポリシーで適切にアクセス制限を行ってください。 3. Tritonの起動オプションでトレース設定 Triton側では起動オプションでOTelによるトレース出力を有効にしました。 --trace-config level=TIMESTAMPS --trace-config rate=1 --trace-config mode=opentelemetry --trace-config opentelemetry,url=http://$(DD_AGENT_HOST):4318/v1/traces これらの設定によりAPIサーバーからTritonへの流れが1本のトレースとして表示され、スパンごとのボトルネック分析が可能になりました。次の図は本システムで実際にDatadog APMに表示されたトレースです。 APIサーバーとTritonのSpanが1本のトレースとして連携されていることがわかります。 課題3:PyTorchモデルがロードされない Tritonを起動したところPyTorchモデルのロードに失敗してTritonが起動しませんでした。モデルファイルは正しいパスに配置されているにもかかわらず以下のようなエラーが出力されました。 UNAVAILABLE: Internal: failed to load model 'genre_extract_torchscript': PytorchStreamReader failed locating file constants.pkl: file not found このエラーは、通常のPyTorchモデルを、TorchScript形式のモデルとして読み込もうとした場合に発生します。 原因 TritonのPyTorch BackendはTorchScript形式のモデルのみをサポートしています。配置していたモデルはTorchScript形式ではなかったためロードできませんでした。 解決策 PyTorch Backend ではすべてのPyTorchモデルをTorchScript形式に変換する必要があります。本システムでは torch.jit.script を使用してモデルを変換しました。 # Before: PyTorchモデルの重みをそのまま保存 torch.save(model.state_dict(), 'best_model.pt' ) # After: TorchScript形式で保存 jit_script = torch.jit.script(model).eval() jit_script.save(f "{model_dir}/best_model_torchscript.pt" ) この変更によりモデルが正常にロードされました。 導入効果 本節では本番運用を通じて実際に確認できた効果を紹介します。 性能面では同じ総Pod数で目標レイテンシ以内に処理可能なリクエスト数が約38%向上しました。Triton導入前はAPI Pod 4台(CPU)で24rpsが上限でした。導入後はAPI Pod 3台とTriton Pod 1台(計4台、いずれもCPU)で33rpsまで処理できるようになりました。 運用面では config.pbtxt による宣言的な設定管理により設定変更が容易になりました。 項目 Before(Triton導入前) After(Triton導入後) バッチサイズ設定 コード修正が必要 max_batch_size で設定 推論インスタンス数設定 ワーカー数やPod数を調整 instance_group で設定 Dynamic Batching設定 自前実装が必要 dynamic_batching で設定 モデル入出力定義 コード内で暗黙的に定義 input / output で設定 モデルバージョン管理 独自の命名規則で運用 ディレクトリ構造で標準化 またTriton公式のPerf Analyzerにより前処理・後処理の影響を排除した推論単体の性能検証が可能になりました。 スケーラビリティの面ではジャンル診断APIの低レイテンシとスケーラビリティの要件に備えた体制が整いました。APIサーバーとTritonで責務を分離したことで水平スケールや垂直スケールをそれぞれ独立して適用できます。推論がボトルネックになればTritonにGPUを割り当て、前処理がボトルネックになればAPIサーバーのCPUを増強するといった柔軟なリソース配分が可能です。Dynamic Batchingの有効化も設定変更だけで対応できます。 本番稼働から半年以上が経過し大きな障害なく安定稼働しています。社内でもTriton導入を検討するチームがあり本番運用の実績とナレッジを共有できる状態になりました。 導入時の注意点 コスト面の考慮 Tritonを導入する際はコスト面での検討が重要です。リクエスト数が少ない場合、逆にコスト増となる可能性があります。 MLOpsブロックではAPI系サービスの可用性担保のためAPIサーバーを最小3台構成としており、導入前は3台で運用していました。導入後はAPIサーバー3台に加えて可用性担保のためTriton Pod 2台が必要になり合計5台構成となりました。同スペックのPodを使用しているため単純計算で約67%のコスト増です。 一方前述の性能検証で示した通りTriton導入により同Pod数でスループットが約38%向上しています。つまりAPIサーバーのみで同等のスループットを得るには5台以上のPodが必要となる計算です。リクエスト数の増加でスケールアウトが必要な場合、Triton構成はコスト効率が良くなります。 現時点ではサービス開始直後でリクエスト数が少なく最小構成での運用となっているためコスト削減効果があるとは言えません。ただし運用・保守性の向上というメリットは得られています。またリクエスト増加時には同Pod数でスループットが約38%向上している点に加えGPUの活用やDynamic Batchingの有効化によるさらなるコスト効率の改善が見込まれます。 まとめ 本記事ではZOZOマッチのジャンル診断APIにTritonを導入した背景から課題と解決策、導入効果までを紹介しました。 社内初の導入で様々な課題に直面しましたがそれぞれ解決できました。 結果として同Pod数でスループットが約38%向上しconfig.pbtxtによる宣言的管理で運用性が向上しました。またAPIサーバーとTritonで役割を分担することで責務が明確になりました。 導入前は社内での運用実績がなく、トラブル発生時の対応やナレッジ不足を懸念していました。しかし本番稼働から半年以上が経過し大きな障害なく安定稼働しています。社内でもTriton導入を検討するチームがあり本番運用の実績とナレッジを共有できる状態になりました。 本記事で紹介した導入時の知見がこれからTritonの導入を検討している方の参考になれば幸いです。 最後になりますがZOZOでは一緒にサービスを作り上げてくれる方を募集中です。MLOpsブロックでも絶賛採用を行っているためご興味ある方は以下のリンクからぜひご応募ください。 hrmos.co 将来のバージョンではこの手動でのヘッダー埋め込みが不要になる可能性があります。詳細は GitHub Discussion を参照してください。 ↩
動画
該当するコンテンツが見つかりませんでした













