AWS Step Functionsの動的並列処理

記事タイトルとURLをコピーする

この記事は約3分で読めます。

この記事は1年以上前に書かれたものです。
内容が古い可能性がありますのでご注意ください。

はじめに

以前、タスク管理ツールのTrello APIを使って、週次でDoneレーンに移動したカード一覧を取得しSlackへ投稿するLambdaを作成したのですが、処理に時間がかかりLambdaのタイムアウト15分を超えてしまうようになりました。それを解消するのに、Step Functionsの動的並列処理機能を試してみたのでご紹介します。

aws.amazon.com

準備

開発環境

  • Python 3.8.6
  • Serverless Framework
    • Framework Core: 2.18.0
    • Plugin: 4.4.2
    • SDK: 2.3.2
    • Components: 3.4.7

Step Functionsワークフロー図

f:id:swx-tani:20210202121539p:plain

作業

関数をわける

時間がかかっているのが、カードの移動履歴の検索処理だったので、元々2つだったファイルをDoneレーンのカードを全て取得する関数のファイルとカードの移動履歴を条件分岐させる関数のファイル、Slackへ投稿する関数のファイルの3つにわけました。

get_card_from_done.py

def get_done():
"""
Doneレーンのカード100件を1要素のリストにする。
"""
board = client.get_board("ボードID")
list = board.get_list("リストID")
cards = list.list_cards(card_filter="open")
card_lists = []
every_100 = []
for i in range(0, 100):
every_100.append(cards[i].id)
card_lists.append(every_100)
every_100 = []
for i in range(100, 200):
every_100.append(cards[i].id)
card_lists.append(every_100)
every_100 = []
for i in range(200, 300):
every_100.append(cards[i].id)
card_lists.append(every_100)
every_100 = []
for i in range(300, len(cards)):
every_100.append(cards[i].id)
card_lists.append(every_100)
return card_lists
def notify(event, handler):
return_done = get_done()
return json.loads(json.dumps(return_done, default=str))

Step Functionsは、ひとつ前からのinputの値がリストになっていて要素ごとにプロセスが走る仕組みになっています。1つのリストの中に10個の要素がある場合、10個のLambdaが立ち上がり並列処理を行うということです。そのため、カード100件ごとを1要素とするリストになるようにします。

search_history.py

def search_card_history(event, context):
'''
カードの移動履歴を条件分岐させる(100件ごとの動的並列処理)
'''
date = get_card_from_done.get_date() # 現在、1週間後、1週間前の日付を取得する。
current_time = date[0]
one_week_before = date[1]
member_dict = get_card_from_done.associate_id_and_name() # メンバーのIDとフルネームを辞書型にして紐づける。
post_message = ""
for card_list_100 in event:
card = client.get_card(card_list_100)
move_history = card.listCardMove_date()
if len(move_history) == 0:
continue
else:
latests_move_history = move_history[0]
last_move_history = latests_move_history[2]
if one_week_before < last_move_history < current_time:
created_message = get_card_from_done.create_post_message(card, member_dict)
post_message += created_message
return post_message

post_slack.py

def chat_postMessage(event, context):
post_message = "*週次でDoneへ移動したカード一覧*"
card_message = "\n".join(event['input']['post_message'])
post_message += card_message
client.chat_postMessage(
channel=settings.SLACK_POST_CHANNEL,
username=settings.SLACK_POST_USERNAME,
icon_emoji=settings.SLACK_POST_EMOJI,
text="",
attachments=[
{
"text": post_message,
"fallback": "カードを表示できません",
"callback_id": "test_card",
"color": "#0b6dd6",
"attachment_type": "default"
}
]
)

Step Functions デプロイ

serverless.yml

service: サービス名
custom:
config:
accountId: AWSアカウントID
prune:
automatic: true
number: 1
region: ${opt:region, self:provider.region}
provider:
name: aws
runtime: python3.8
lambdaHashingVersion: 20201221
profile: default
region: ap-northeast-1
stage: prod
package:
exclude:
- lib/**
functions:
cardlist:
handler: get_card_from_done.notify
timeout: 300
cardhistory:
handler: search_history.search_card_history
timeout: 900
postslack:
handler: post_slack.chat_postMessage
timeout: 300
plugins:
- serverless-step-functions
- serverless-prune-plugin
- serverless-python-requirements
stepFunctions:
stateMachines:
hellostepfunc1:
name: outputDoneCard
events:
- http:
path: create
method: POST
definition:
Comment: "output card moved by weekly"
StartAt: GetCardFromDone
States:
GetCardFromDone:
Type: Task
Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardlist"
InputPath: $
ResultPath: $.result
Next: HistorySearchMap
HistorySearchMap:
Type: Map
InputPath: $
ItemsPath: $.result
Iterator:
StartAt: HistorySearch
States:
HistorySearch:
Type: Task
Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardhistory"
End: True
ResultPath: $.input.post_message
Next: PostSlack
PostSlack:
Type: Task
Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-postslack"
End: true

InputPathは、入力フィルタとして指定された部分がLambdaに入力されます。

ResultPathは、実行結果の格納先を指定します。

ItemsPathはMapで使用され、その項目を入力として反復に渡します。

$.data のように指定します。 「$」を指定すると、入力データ全体が引き渡されます。

f:id:swx-tani:20210202154553p:plain

post_messageのデータを指定したい場合は、$.input.post_messageです。 マネジメントコンソールを確認しながら指定していきます。

これで終わりです。 デプロイして実行を開始すると並列処理により、タイムアウトエラーにならずにカードの一覧が投稿されます。

まとめ

コードは以前のものを使ったので、作業としてはこれだけです。とても簡単に実践できました。

もちろんAmazon CloudWatch Eventsを使ってスケジュール実行もできるので、処理に時間のかかるワークフローをStep Functionsを使って並列処理に変えてみてはいかがでしょうか。

最後まで読んでいただき、ありがとうございました。