この記事は約3分で読めます。
はじめに
こんにちは、中途入社してもうすぐ半年になります、アプリケーションサービス部 ディベロップメントサービス1課の北出です。
今回は、Cognitoによる初回パスワード変更の強制や、Cognitoから発行されるAWSリソースへアクセスするための一時認証情報の取得をPython(Boto3)で実装しましたので、ざっくりとですが方法を紹介します。
クライアント側の実装なのでJavaScriptやTypeScriptのほうが需要がありそうですが、私はあまりそちらに明るくないのでPythonになります。
システム構成
本記事では以下のようなシステム構成を想定しています。

今回の認証の流れ
本記事の大まかな流れは以下になります。
- ユーザープールの作成(マネジメントコンソール)
- ユーザーの作成(マネジメントコンソール)
- マネコンから管理者権限でユーザーを作成することでユーザーのログイン時にパスワード変更を強制します。

- IDプールの作成(マネジメントコンソール)
- 初回パスワード変更
- ログイン
- 一時認証情報の取得
- AWSリソースへのアクセス
認証方式はこちらのドキュメントのクライアントの認証フローを参考にしてください。
準備
ユーザープールの作成
以下の流れでマネジメントコンソールからユーザープールを作成してください
- Cognitoのページへ行きます
- ユーザープール→「ユーザープールを作成」を選択します
- 画像のように以下の部分を設定します
- アプリケーションタイプ:シングルページアプリケーション
- サインイン識別子のオプション:ユーザー名
- サインアップのための必須属性:email
- 作成します
ユーザーの作成
ユーザープールを作成したら引き続き、マネジメントコンソールからユーザーを作成します。
- ユーザープールから作成したユーザープールを選択します
- 左側ダッシュボードの「ユーザー」から、「ユーザーを作成」を選択します
- 画像のように以下の部分を設定します
- Eメールで招待を送信
- ユーザー名
- メールアドレス
- 仮パスワード:仮パスワードを生成
- 作成します
作成されると、設定したメールアドレスに自動生成された仮パスワードが送られてくるはずです。
IDプールの作成
ユーザープールを作成することで、だれがアクセスしてきているかの認証の基盤が作られます。
認証しただけでは、ユーザーがどのAWSリソースにアクセスしていいかの権限の付与ができていません。この認可の基盤であるIDプールを作成します。
- Cognitoのページに行きます
- IDプール→「IDプールを作成」を選択します
- IDプールの設定をします
- 認証
- ユーザーアクセス:認証されたアクセス
- 認証されたIDソース:Amazon Cognito ユーザープール
- IAMロール
- 新しいIAMロールを作成
- 本記事ではS3バケットへのアップロードを行いますので、アップロードをする権限を付与してください
- IDプロバイダーを接続
- ユーザープールID:作成したユーザープール
- アプリクライアントID:ユーザープール作成時にデフォルトで作成されているものを選択
- 他の設定はデフォルトで問題ありません。
- 作成します
実装
ここまでの手順で最低限必要なものはそろっているはずなのでここからPythonプログラムに移っていきます。
プログラム内でユーザープールIDやアプリクライアントIDなどのパラメータを使用しますが、マネジメントコンソール上やAWS CLI で取得できるため、それらの取得手順は省略します。
ディレクトリ構成
本記事で実装するディレクトリ構成は以下になります。
.
├── __init__.py
├── change_initial_password.py
├── config.ini
├── s3_upload.py
├── libs
│ ├── __init__.py
│ ├── get_access_token.py
│ ├── logger_config.py
│ ├── session_manager.py
│ └── srp.py
初回パスワード変更
このステップで実装する部分は以下になります。
change_initial_password.py
config.ini
- CognitoユーザープールIDや一時パスワード、変更後のパスワードを定義し格納します。ファイルの中身は本記事では記載しませんので、各Pythonファイルから必要なものを判断し作成してください。
libs/srp.py
- Cognitoの認証で必要なSRP値の計算をするプログラムです。
以下で各プログラムの中身を記述します。
libs/srp.py
import base64
import binascii
import datetime
import hashlib
import hmac
import os
import re
import boto3
import six
init_N = (
"FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1"
+ "29024E088A67CC74020BBEA63B139B22514A08798E3404DD"
+ "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245"
+ "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"
+ "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D"
+ "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F"
+ "83655D23DCA3AD961C62F356208552BB9ED529077096966D"
+ "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B"
+ "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9"
+ "DE2BCBF6955817183995497CEA956AE515D2261898FA0510"
+ "15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64"
+ "ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7"
+ "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B"
+ "F12FFA06D98A0864D87602733EC86A64521F2B18177B200C"
+ "BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31"
+ "43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF"
)
g_hex = "2"
info_bits = bytearray("Caldera Derived Key", "utf-8")
def hex_to_long(hex_string):
"""
16 進文字列を 10 進整数に変換する
"""
return int(hex_string, 16)
def long_to_hex(long_num):
"""
10 進整数を 16 進文字列に変換する
"""
return f"{long_num:x}"
def pad_hex(long_int):
"""
16進文字列に変換してから適宜先頭に 0 または 00 を補う
PAD に相当
"""
if not isinstance(long_int, six.string_types):
hash_str = long_to_hex(long_int)
else:
hash_str = long_int
if len(hash_str) % 2 == 1:
hash_str = f"0{hash_str}"
elif hash_str[0] in "89ABCDEFabcdef":
hash_str = f"00{hash_str}"
return hash_str
def hash_sha256(buf):
"""
バイト列を sha256 でハッシュ化してから 16 進文字列に変換
64 文字に満たない場合は先頭を 0 で埋める
"""
a = hashlib.sha256(buf).hexdigest()
return (64 - len(a)) * "0" + a
def hex_hash(hex_string):
"""
16 進文字列を 64 桁のハッシュに変換する
"""
return hash_sha256(bytearray.fromhex(hex_string))
def calculate_u(A, B):
"""
u = SHA1(PAD(A) | PAD(B)) を計算する
SHA1 じゃなくて SHA256 かもしれない
"""
u_hex_hash = hex_hash(pad_hex(A) + pad_hex(B))
return hex_to_long(u_hex_hash)
def compute_hkdf(ikm, salt):
"""
キー導出関数を計算する
先頭 16 バイト
"""
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
info_bits_update = info_bits + bytearray(chr(1), "utf-8")
hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest()
return hmac_hash[:16]
def get_now_string():
"""
現在時刻を取得する
'Sun Apr 9 07:46:12 UTC 2022'
のように、day が一桁の場合は十の位をゼロにする点に注意
"""
return re.sub(
r" 0(\d) ",
r" \1 ",
datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"),
)
class CognitoSRP:
def __init__(self, username, password, pool_id, client_id, client):
self.username = username
self.password = password
self.pool_id = pool_id
self.client_id = client_id
self.client = client if client else boto3.client("cognito-idp")
self.N = hex_to_long(init_N)
self.g = hex_to_long(g_hex)
self.k = hex_to_long(
hex_hash("00" + init_N + "0" + g_hex)
) # k = SHA1(N | PAD(g))
self.small_a_value = self.generate_random_small_a()
self.A = self.calculate_a()
def generate_random_small_a(self):
"""
A = g^a % N の a を計算する
"""
hex_random = binascii.hexlify(os.urandom(128))
rand_int = hex_to_long(hex_random)
small_a_int = rand_int % self.N
return small_a_int
def calculate_a(self):
"""
A = g^a % N を計算する
"""
A = pow(self.g, self.small_a_value, self.N)
if A % self.N == 0:
raise ValueError("Illegal paramater. A mod N cannot be 0.")
return A
def get_auth_params(self):
"""
InitiateAuth API の引数
"""
auth_params = {"USERNAME": self.username, "SRP_A": long_to_hex(self.A)}
return auth_params
def authenticate_user(self):
"""
USER_SRP_AUTH の実行
"""
auth_params = self.get_auth_params()
response = self.client.initiate_auth(
AuthFlow="USER_SRP_AUTH",
AuthParameters=auth_params,
ClientId=self.client_id,
)
if response["ChallengeName"] == "PASSWORD_VERIFIER":
challenge_response = self.get_challenge_response(
response["ChallengeParameters"]
)
response = self.client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName="PASSWORD_VERIFIER",
ChallengeResponses=challenge_response,
)
return response
def get_challenge_response(self, challenge_parameters):
"""
PASSWORD_VERIFIER_CHALLENGE に必要なレスポンスを生成する
"""
timestamp = get_now_string()
user_id_for_srp = challenge_parameters["USER_ID_FOR_SRP"]
server_b_value = hex_to_long(challenge_parameters["SRP_B"])
salt = hex_to_long(challenge_parameters["SALT"])
secret_block_b64 = challenge_parameters["SECRET_BLOCK"]
secret_block_bytes = base64.standard_b64decode(secret_block_b64)
hkdf = self.get_password_authentication_key(
user_id_for_srp, self.password, server_b_value, salt
)
msg = (
bytearray(self.pool_id.split("_")[1], "utf-8")
+ bytearray(user_id_for_srp, "utf-8")
+ bytearray(secret_block_bytes)
+ bytearray(timestamp, "utf-8")
)
hmac_hash = hmac.new(hkdf, msg, hashlib.sha256).digest()
signature_string = base64.standard_b64encode(hmac_hash)
challenge_response = {
"TIMESTAMP": timestamp,
"USERNAME": user_id_for_srp,
"PASSWORD_CLAIM_SECRET_BLOCK": secret_block_b64,
"PASSWORD_CLAIM_SIGNATURE": signature_string.decode("utf-8"),
}
return challenge_response
def get_password_authentication_key(self, username, password, server_b_value, salt):
"""
ハッシュ化用のキーを生成する
"""
if server_b_value % self.N == 0:
raise ValueError("B cannot be zero.")
u_value = calculate_u(self.A, server_b_value)
if u_value == 0:
raise ValueError("U cannot be zero.")
username_password = "{0}{1}:{2}".format(
self.pool_id.split("_")[1], username, password
)
username_password_hash = hash_sha256(bytearray(username_password, "utf-8"))
# username_password_hash = hash_sha256(username_password.encode('utf-8')) # 同じ
x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash))
int_value2 = server_b_value - self.k * pow(self.g, x_value, self.N)
# (B - (k * g^x)) ^ (a + (u * x)) % N を実行している
s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.N)
hkdf = compute_hkdf(
bytearray.fromhex(pad_hex(s_value)),
bytearray.fromhex(pad_hex(long_to_hex(u_value))),
)
return hkdf
このコードはこちらを使わせていただいています。
SRPとは Secure Remote Password の略のプロトコルで、詳しい説明は省きますが、パスワードを直接サーバーに送信しないので盗聴などに強いものと認識してもらえれば大丈夫です。
change_initial_password.py
import configparser
import boto3
from libs.srp import CognitoSRP
config = configparser.ConfigParser()
config.read("config.ini")
USER_POOL_ID = config["DEFAULT"]["USER_POOL_ID"]
CLIENT_ID = config["DEFAULT"]["CLIENT_ID"]
USERNAME = config["DEFAULT"]["USERNAME"]
TEMPORARY_PASSWORD = config["DEFAULT"]["TEMPORARY_PASSWORD"]
NEW_PASSWORD = config["DEFAULT"]["NEW_PASSWORD"]
cognito_idp = boto3.client("cognito-idp")
def authenticate_user():
srp = CognitoSRP(
username=USERNAME,
password=TEMPORARY_PASSWORD,
pool_id=USER_POOL_ID,
client_id=CLIENT_ID,
client=cognito_idp,
)
srp_a = srp.get_auth_params()["SRP_A"]
response = cognito_idp.initiate_auth(
AuthFlow="USER_SRP_AUTH",
AuthParameters={"SRP_A": srp_a, "USERNAME": USERNAME},
ClientId=CLIENT_ID,
)
print("initiate_auth_response:", response)
challenge_parameters = response["ChallengeParameters"]
challenge_response = srp.get_challenge_response(challenge_parameters)
response = cognito_idp.respond_to_auth_challenge(
ClientId=CLIENT_ID,
ChallengeName="PASSWORD_VERIFIER",
ChallengeResponses=challenge_response,
)
print(f"Authentication successful: {response}")
return response
def respond_to_new_password_challenge(client_id:str, user_name: str,session: str, new_password: str) -> dict:
"""
Change the password of the user.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp/client/respond_to_auth_challenge.html
Args:
client_id: cognito のクライアントID
user_name: ユーザー名
session: セッショントークン
new_password: 新しいパスワード
Returns:
dict: パスワード変更のレスポンス
"""
print(f"Session: {session}")
response = cognito_idp.respond_to_auth_challenge(
ClientId=client_id,
ChallengeName="NEW_PASSWORD_REQUIRED",
ChallengeResponses={
"USERNAME": user_name,
"NEW_PASSWORD": new_password,
},
Session=session,
)
print(f"respond_to_auth_challenge_response: {response}")
return response
def authenticate_and_change_password(new_password):
response = authenticate_user()
if (
"ChallengeName" in response
and response["ChallengeName"] == "NEW_PASSWORD_REQUIRED"
):
session = response["Session"]
change_password_response = respond_to_new_password_challenge(CLIENT_ID, USERNAME, session, new_password)
print("Password changed successfully:", change_password_response)
else:
print("Authentication successful, no password change required.")
def main():
authenticate_and_change_password(NEW_PASSWORD)
if __name__ == "__main__":
main()
無事パスワードが変更できていれば、マネジメントコンソールでCognitoのユーザーステータスが確認済みになっているはずです。
一時認証情報の取得
このステップで実装する部分は以下になります。
libs/get_access_token.py
- ユーザー名とパスワードからAWSリソースへアクセスするための一時認証情報を取得するための関数です。この関数をつかった実装は次のステップで記述します。
libs/get_access_token.py
import configparser
import boto3
import jwt
from botocore.exceptions import ClientError
from libs.srp import CognitoSRP
config = configparser.ConfigParser()
config.read("config.ini")
AWS_REGION = config["DEFAULT"]["AWS_REGION"]
USER_POOL_ID = config["DEFAULT"]["USER_POOL_ID"]
CLIENT_ID = config["DEFAULT"]["CLIENT_ID"]
USERNAME = config["DEFAULT"]["USERNAME"]
TEMPORARY_PASSWORD = config["DEFAULT"]["TEMPORARY_PASSWORD"]
NEW_PASSWORD = config["DEFAULT"]["NEW_PASSWORD"]
IDPOOL_ID = config["DEFAULT"]["IDPOOL_ID"]
REFRESH_TOKEN = config["DEFAULT"]["REFRESH_TOKEN"]
USER_ID = config["DEFAULT"]["USER_ID"]
cognito_idp = boto3.client("cognito-idp")
def authenticate_user():
srp = CognitoSRP(
username=USERNAME,
password=NEW_PASSWORD,
pool_id=USER_POOL_ID,
client_id=CLIENT_ID,
client=cognito_idp,
)
srp_a = srp.get_auth_params()["SRP_A"]
response = cognito_idp.initiate_auth(
AuthFlow="USER_SRP_AUTH",
AuthParameters={"SRP_A": srp_a, "USERNAME": USERNAME},
ClientId=CLIENT_ID,
)
challenge_parameters = response["ChallengeParameters"]
challenge_response = srp.get_challenge_response(challenge_parameters)
response = cognito_idp.respond_to_auth_challenge(
ClientId=CLIENT_ID,
ChallengeName="PASSWORD_VERIFIER",
ChallengeResponses=challenge_response,
)
return (
response["AuthenticationResult"]["AccessToken"],
response["AuthenticationResult"]["RefreshToken"],
response["AuthenticationResult"]["IdToken"],
)
def get_identity_id(id_token, identity_pool_id):
id_provider = boto3.client("cognito-identity")
response = id_provider.get_id(
IdentityPoolId=identity_pool_id,
Logins={f"cognito-idp.{AWS_REGION}.amazonaws.com/{USER_POOL_ID}": id_token},
)
return response["IdentityId"], id_provider
AWSリソースへのアクセス
このステップで実装する部分は以下になります。
libs/session_manager.py
- 前ステップで作成した関数をつかって、一時認証情報を取得し、Boto3セッションを作成します。
s3_upload.py
- Boto3セッションからS3クライアントを作成し、S3バケットのファイルをアップロードします。
libs/session_manager.py
import boto3
import configparser
import libs.get_access_token as get_access_token
from libs.logger_config import setup_logger
# ロガーの設定
logger = setup_logger("session_manager")
# 設定ファイルの読み込み
config = configparser.ConfigParser()
config.read("config.ini")
AWS_REGION = config["DEFAULT"]["AWS_REGION"]
USER_POOL_ID = config["DEFAULT"]["USER_POOL_ID"]
IDPOOL_ID = config["DEFAULT"]["IDPOOL_ID"]
def create_session():
"""Cognito認証を行い、AWSの一時セッションを作成する"""
# ユーザープールで認証
accessToken, refreshToken, id_token = get_access_token.authenticate_user()
# Identity Poolを使って一時認証情報を取得
identity_id, id_provider = get_access_token.get_identity_id(id_token, IDPOOL_ID)
# 一時認証情報を取得
response = id_provider.get_credentials_for_identity(
IdentityId=identity_id,
Logins={f"cognito-idp.{AWS_REGION}.amazonaws.com/{USER_POOL_ID}": id_token},
)
credentials = response["Credentials"]
# Boto3セッション作成
session = boto3.Session(
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretKey"],
aws_session_token=credentials["SessionToken"],
)
return session, identity_id
s3_upload.py
import configparser
import libs.session_manager as session_manager
from libs.logger_config import setup_logger
# ロガーの設定
logger = setup_logger("s3_upload")
"""
S3アップロードするプログラム
"""
# 設定ファイルの読み込み
config = configparser.ConfigParser()
config.read("config.ini")
S3_REQUEST_BUCKET = config["DEFAULT"]["S3_REQUEST_BUCKET"]
LOCAL_FILE_PATH = config["DEFAULT"]["LOCAL_FILE_PATH"]
# セッションの取得
session, identity_id = session_manager.create_session()
# S3クライアントの作成
s3 = session.client("s3")
# S3アップロード情報
s3_bucket = S3_REQUEST_BUCKET
s3_key = f"{identity_id}/input.txt"
local_file_path = LOCAL_FILE_PATH
# S3にファイルをアップロード
response = s3.upload_file(local_file_path, s3_bucket, s3_key)
logger.info(f"Uploaded {local_file_path} to s3://{s3_bucket}/{s3_key}")
アップロード先のS3バケットのプレフィックスにidentity_idを指定しています。これは、ユーザーごとに割り当てられる一意のIDです。
例えば、複数のユーザーがいるときに、自分以外のユーザーのバケットにアクセスできないようにするといった制御をするときに使えると考えています。
このあたりは次回以降の記事で説明できればと思います。
おわりに
業務でBoto3でCognitoの認証を実装する必要があったのですが、一通りの流れが書かれた記事があまりなかったので今回記事にしてみました。(単にpythonでの需要がないだけの可能性もありますが)
何かの参考になれば幸いです。
北出 宏紀(執筆記事の一覧)
アプリケーションサービス本部ディベロップメントサービス1課
2024年9月中途入社です。
毎朝1時間資格勉強継続中です。