![]() |
こんにちは、SCSK北村です。
Amazon EC2 インスタンスのリソースデータを取得するために、Datadog API を使ってみました。
結論として、期間を指定してCPUの使用率データをCSVファイルで出力することができました。
今回は、備忘も兼ねてファイル出力までの流れをまとめたいと思います。
事前準備
実行環境は以下の通りです。
OS:Windows
プログラミング言語:Python 3.12.0
Datadog API を使用するために必要なライブラリをインストールします。
対象:datadog-api-client
※Python3.7以上で利用可能
pip install datadog-api-client
実際にやったこと
基本的には下記リンク[※1]を見ながらスクリプトを作成していきました。
手順を追って紹介しますが、全体のコードをまず見たい方は読み飛ばしてください。
APIキーとAPPキーの準備
Datadog APIを利用する際、APIキーとAPPキーによる認証が必要になります。
今回、YAML形式で別ファイルに認証情報を記載し、プログラムから読み取っています。
datadog:
api_key: <ここにAPI KEYを入力>
app_key: <ここにAPP KEYを入力>
リクエストで送信するクエリを作成
リソースデータの取得には、TimeseriesFormulaQueryRequestメソッドを使用して、まずリクエストボディを作成します。
公式リファレンス[※1]にサンプルコードが掲載されているので、要件に合わせてボディをカスタマイズしていきます。
今回はCPU使用率データの取得ということで、以下のようにカスタマイズしました。
クエリを任意のものに書き換えれば、メモリ使用率やディスク使用率などのDatadogで収集しているデータは取得可能です!
Datadog のメトリクスエクスプローラーを使用すると、クエリを色々と試せるので便利です。
# リクエストで送信するクエリを定義
QUERY1 = 'avg:system.cpu.idle{name:'+target+'} by {name}' #EC2のNameタグ(target)でフィルタリング
# リクエストボディ
body = TimeseriesFormulaQueryRequest(
data=TimeseriesFormulaRequest(
attributes=TimeseriesFormulaRequestAttributes(
formulas=[
QueryFormula(
formula='100 - q1',
limit=FormulaLimit(
count=10,
order=QuerySortOrder.DESC,
),
),
],
_from=time_from*1000, #データ取得の始点(UNIX時間)
interval=300000, #データ取得間隔(ミリ秒)
queries=TimeseriesFormulaRequestQueries(
[
MetricsTimeseriesQuery(
data_source=MetricsDataSource.METRICS,
query=QUERY1,
name='q1',
),
]
),
to=time_to*1000, #データ取得の終点(UNIX時間)
),
type=TimeseriesFormulaRequestType.TIMESERIES_REQUEST,
),
)
Datadog APIへリクエスト送信
作成したリクエストボディをセットして、Datadog APIにリクエストを送信します。
その際に、前述の認証情報が必要なので、あわせてセットします。
問題なければ、response 変数にDatadog APIからのレスポンスデータが格納されます。
configuration = Configuration()
configuration.unstable_operations['query_timeseries_data'] = True
# Datadog API にリクエスト送信
with ApiClient(configuration) as api_client:
api_client.default_headers['DD-API-KEY'] = api_key
api_client.default_headers['DD-APPLICATION-KEY'] = app_key
api_instance = MetricsApi(api_client)
response = api_instance.query_timeseries_data(body=body)
レスポンスデータを整形してCSVに書き出す
まず、ファイルに書き込みたい情報をレスポンスデータから取得します。
データは、”<レスポンスデータのキー>.value”で取り出すことができました。
# ホスト名を取得
res_series = response['data']['attributes']['series'].value
group_tag = res_series[0]['group_tags'].value[0].split(':')
hostname = group_tag[-1]
# リソースデータのリストを取得
res_values = response['data']['attributes']['values'].value
# 時間データのリストを取得
res_times = response['data']['attributes']['times'].value
レスポンスデータに含まれる時間はUNIX時間なので、日本時間へ変換します。
# UNIX時間を日本時間に変換した結果を格納するリスト
timeseries = []
# UNIX時間を秒に変換 (ミリ秒から秒に変換)
for epoch_time in res_times:
epoch_time_sec = epoch_time / 1000
# UNIX時間を日本時間に変換
jst = datetime.datetime.fromtimestamp(
epoch_time_sec,
datetime.timezone(datetime.timedelta(hours=9))
)
# 指定された形式で日本時間をフォーマット
jst_formatted = jst.strftime('%Y/%m/%d %H:%M')
timeseries.append(jst_formatted)
複数のEC2インスタンスのデータを取得するので、インスタンス毎にファイルを分けて出力するようにしています。
CSVファイルの中身は、1列目が「時間」、2列目が「CPU使用率の値」としています。
# hostname ごとにデータをまとめる
grouped_data = {}
grouped_data[hostname] = {'values': res_values[0]}
# ファイル保存先
SAVE_DIR = f'result/{dt}/{TARGET_RESOURCE}/'
# SAVE_DIR が存在しなかったら作成
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
# データをCSVファイルに書き込む
for group_data in grouped_data.values():
values = group_data['values']
filepath = SAVE_DIR + f'{hostname}_CPU.csv'
with open(filepath, mode='w', newline='') as csvfile:
writer = csv.writer(csvfile)
for i in range(len(timeseries)):
row = [timeseries[i], values.value[i]]
writer.writerow(row)
作成したコード
- cpu_util.py
今回作成したコードの全体です。
import os
import csv
import datetime
import logging.config
import yaml
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v2.api.metrics_api import MetricsApi
from datadog_api_client.v2.model.formula_limit import FormulaLimit
from datadog_api_client.v2.model.metrics_data_source import MetricsDataSource
from datadog_api_client.v2.model.metrics_timeseries_query import MetricsTimeseriesQuery
from datadog_api_client.v2.model.query_formula import QueryFormula
from datadog_api_client.v2.model.query_sort_order import QuerySortOrder
from datadog_api_client.v2.model.timeseries_formula_query_request import TimeseriesFormulaQueryRequest
from datadog_api_client.v2.model.timeseries_formula_request import TimeseriesFormulaRequest
from datadog_api_client.v2.model.timeseries_formula_request_attributes import TimeseriesFormulaRequestAttributes
from datadog_api_client.v2.model.timeseries_formula_request_queries import TimeseriesFormulaRequestQueries
from datadog_api_client.v2.model.timeseries_formula_request_type import TimeseriesFormulaRequestType
# 環境設定
CONFIG = 'config/credentials.yaml'
EC2_TARGETS = 'config/target_ec2_list.yaml'
TARGET_RESOURCE = 'cpu'
LOG_DIR = 'log/'
# LOG_DIR が存在しなかったら作成
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
with open('config/log_config.yaml', 'r') as config_file:
config = yaml.safe_load(config_file)
logging.config.dictConfig(config)
logger = logging.getLogger('cpu')
def get_cpu_utilization(api_key, app_key, time_from, time_to, target, dt):
try:
# リクエストで送信するクエリを定義
QUERY1 = 'avg:system.cpu.idle{name:'+target+'} by {name}'
# リクエストボディ
body = TimeseriesFormulaQueryRequest(
data=TimeseriesFormulaRequest(
attributes=TimeseriesFormulaRequestAttributes(
formulas=[
QueryFormula(
formula='100 - q1',
limit=FormulaLimit(
count=10,
order=QuerySortOrder.DESC,
),
),
],
_from=time_from*1000,
interval=300000,
queries=TimeseriesFormulaRequestQueries(
[
MetricsTimeseriesQuery(
data_source=MetricsDataSource.METRICS,
query=QUERY1,
name='q1',
),
]
),
to=time_to*1000,
),
type=TimeseriesFormulaRequestType.TIMESERIES_REQUEST,
),
)
configuration = Configuration()
configuration.unstable_operations['query_timeseries_data'] = True
# Datadog API にリクエスト送信
with ApiClient(configuration) as api_client:
api_client.default_headers['DD-API-KEY'] = api_key
api_client.default_headers['DD-APPLICATION-KEY'] = app_key
api_instance = MetricsApi(api_client)
response = api_instance.query_timeseries_data(body=body)
# ホスト名を取得
res_series = response['data']['attributes']['series'].value
group_tag = res_series[0]['group_tags'].value[0].split(':')
hostname = group_tag[-1]
# リソースデータのリストを取得
res_values = response['data']['attributes']['values'].value
# 時間データのリストを取得
res_times = response['data']['attributes']['times'].value
# UNIX時間を日本時間に変換した結果を格納するリスト
timeseries = []
# UNIX時間を秒に変換 (ミリ秒から秒に変換)
for epoch_time in res_times:
epoch_time_sec = epoch_time / 1000
# UNIX時間を日本時間に変換
jst = datetime.datetime.fromtimestamp(
epoch_time_sec,
datetime.timezone(datetime.timedelta(hours=9))
)
# 指定された形式で日本時間をフォーマット
jst_formatted = jst.strftime('%Y/%m/%d %H:%M')
timeseries.append(jst_formatted)
# hostname ごとにデータをまとめる
grouped_data = {}
grouped_data[hostname] = {'values': res_values[0]}
# ファイル保存先
SAVE_DIR = f'result/{dt}/{TARGET_RESOURCE}/'
# SAVE_DIR が存在しなかったら作成
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
# データをCSVファイルに書き込む
for group_data in grouped_data.values():
values = group_data['values']
# ファイル保存先パス・ファイル名
filepath = SAVE_DIR + f'{hostname}_CPU.csv'
with open(filepath, mode='w', newline='') as csvfile:
writer = csv.writer(csvfile)
for i in range(len(timeseries)):
row = [timeseries[i], values.value[i]]
writer.writerow(row)
logger.info(f"データが書き込まれました: '{filepath}'")
except Exception as e:
# 例外処理
logger.error(f'予期せぬエラーが発生しました: {str(e)}')
else:
# 処理成功
logger.info(f'[{target}] の処理が完了しました')
def main():
try:
logger.info(f'リソースデータ[{TARGET_RESOURCE}] の取得を開始します')
# 設定ファイルを読み込む
with open(CONFIG, 'r') as config_file:
config = yaml.safe_load(config_file)
# APIキーとAPPキーを取得
api_key = config['datadog']['api_key']
app_key = config['datadog']['app_key']
# 設定ファイルを読み込む
with open(EC2_TARGETS, 'r') as ec2_file:
ec2 = yaml.safe_load(ec2_file)
# データ取得する対象を読み込む
target_list = ec2['hostname']
# 現在の日付を取得
today = datetime.date.today()
# 当月の初日を計算
first_day_of_current_month = datetime.datetime(today.year, today.month, 1)
# 前月の初日を計算
if today.month == 1:
# 1月の場合、前年の12月の初日になる
first_day_of_last_month = datetime.datetime(today.year - 1, 12, 1)
dt = str(today.year - 1) + str(12) # for 'SAVE_DIR' path
else:
# それ以外の場合、前月の初日になる
first_day_of_last_month = datetime.datetime(today.year, today.month - 1, 1)
dt = str(today.year) + str(today.month - 1) # for 'SAVE_DIR' path
# UNIX時間に変換
time_from = int(first_day_of_last_month.timestamp()) # 前月初日
time_to = int(first_day_of_current_month.timestamp()) # 当月初日
# 対象毎にループ処理
for target in target_list:
logger.info(f'[{target}] の処理を開始します')
get_cpu_utilization(api_key, app_key, time_from, time_to, target, dt)
except FileNotFoundError as e:
# ファイルが存在しない場合の例外処理
logger.error(f'設定ファイルが見つかりません: {str(e)}')
except IOError as e:
# その他の入出力関連のエラーに対する例外処理
logger.error(f'入出力エラーが発生しました: {str(e)}')
except Exception as e:
# その他の例外に対する例外処理
logger.error(f'予期せぬエラーが発生しました: {str(e)}')
else:
# 処理成功
logger.info(f'リソースデータ[{TARGET_RESOURCE}] の取得が完了しました')
if __name__ == '__main__':
logger.info('START SCRIPT')
main()
logger.info('END SCRIPT')
- credentials.yaml
Datadog APIにリクエストを送信する際の認証情報をここに記載します。
datadog:
api_key: <ここにAPI KEYを入力>
app_key: <ここにAPP KEYを入力>
- log_config.yaml
loggingモジュールでログを出力させるための設定ファイルです。
ターミナルとファイルのそれぞれに出力できるように記載しています。
version: 1
formatters:
simple_fmt:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
datefmt: '%Y/%m/%d %H:%M:%S'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple_fmt
stream: ext://sys.stdout
file:
class: logging.handlers.TimedRotatingFileHandler
level: DEBUG
formatter: simple_fmt
filename: log/app.log
encoding: utf-8
when: midnight
interval: 30
backupCount: 3
loggers:
cpu:
level: INFO
handlers: [console, file]
propagate: no
root:
level: DEBUG
handlers: [console, file]
- target_ec2_list.yaml
データ取得対象のEC2インスタンスをこのファイルに記載し、プログラムから取得しています。
# EC2インスタンスのNameタグの値を以下に記載
hostname:
- server01
- server02
- server03
- 出力ファイルのサンプル
今回作成したプログラムでは以下のように出力されます。
プログラムを実行した日付から、前月1か月分のCPU使用率が5分間隔で取得することができました。
2023/10/01 00:00,9.209855
2023/10/01 00:05,8.721778
2023/10/01 00:10,8.922941
2023/10/01 00:15,8.763545
2023/10/01 00:20,8.784494
2023/10/01 00:25,8.795538
2023/10/01 00:30,8.714313
2023/10/01 00:35,8.817114
2023/10/01 00:40,8.730093
2023/10/01 00:45,8.940528
2023/10/01 00:50,8.863131
2023/10/01 00:55,8.967478
・・・
あとがき
運用業務の中でサーバリソースの使用状況を確認する場面が出てくると思います。
取得するサーバ台数やメトリクスが多くなると、コンソールのGUI操作でデータ取得作業は結構手間がかかりますよね。
今回、Datadog APIを使用して自動化ができたことで、この運用がかなり楽になりました。
今後はAWS Lambdaへの移行も検討していきたいと考えています。
本記事が皆様のお役に立てれば幸いです。