この記事は約3分で読めます。
この記事は1年以上前に書かれたものです。
内容が古い可能性がありますのでご注意ください。
はじめに
以前、タスク管理ツールのTrello APIを使って、週次でDoneレーンに移動したカード一覧を取得しSlackへ投稿するLambdaを作成したのですが、処理に時間がかかりLambdaのタイムアウト15分を超えてしまうようになりました。それを解消するのに、Step Functionsの動的並列処理機能を試してみたのでご紹介します。
準備
開発環境
- Python 3.8.6
- Serverless Framework
- Framework Core: 2.18.0
- Plugin: 4.4.2
- SDK: 2.3.2
- Components: 3.4.7
Step Functionsワークフロー図
作業
関数をわける
時間がかかっているのが、カードの移動履歴の検索処理だったので、元々2つだったファイルをDoneレーンのカードを全て取得する関数のファイルとカードの移動履歴を条件分岐させる関数のファイル、Slackへ投稿する関数のファイルの3つにわけました。
get_card_from_done.py
def get_done():"""Doneレーンのカード100件を1要素のリストにする。"""board = client.get_board("ボードID")list = board.get_list("リストID")cards = list.list_cards(card_filter="open")card_lists = []every_100 = []for i in range(0, 100):every_100.append(cards[i].id)card_lists.append(every_100)every_100 = []for i in range(100, 200):every_100.append(cards[i].id)card_lists.append(every_100)every_100 = []for i in range(200, 300):every_100.append(cards[i].id)card_lists.append(every_100)every_100 = []for i in range(300, len(cards)):every_100.append(cards[i].id)card_lists.append(every_100)return card_listsdef notify(event, handler):return_done = get_done()return json.loads(json.dumps(return_done, default=str))
Step Functionsは、ひとつ前からのinputの値がリストになっていて要素ごとにプロセスが走る仕組みになっています。1つのリストの中に10個の要素がある場合、10個のLambdaが立ち上がり並列処理を行うということです。そのため、カード100件ごとを1要素とするリストになるようにします。
search_history.py
def search_card_history(event, context):'''カードの移動履歴を条件分岐させる(100件ごとの動的並列処理)'''date = get_card_from_done.get_date() # 現在、1週間後、1週間前の日付を取得する。current_time = date[0]one_week_before = date[1]member_dict = get_card_from_done.associate_id_and_name() # メンバーのIDとフルネームを辞書型にして紐づける。post_message = ""for card_list_100 in event:card = client.get_card(card_list_100)move_history = card.listCardMove_date()if len(move_history) == 0:continueelse:latests_move_history = move_history[0]last_move_history = latests_move_history[2]if one_week_before < last_move_history < current_time:created_message = get_card_from_done.create_post_message(card, member_dict)post_message += created_messagereturn post_message
post_slack.py
def chat_postMessage(event, context):post_message = "*週次でDoneへ移動したカード一覧*"card_message = "\n".join(event['input']['post_message'])post_message += card_messageclient.chat_postMessage(channel=settings.SLACK_POST_CHANNEL,username=settings.SLACK_POST_USERNAME,icon_emoji=settings.SLACK_POST_EMOJI,text="",attachments=[{"text": post_message,"fallback": "カードを表示できません","callback_id": "test_card","color": "#0b6dd6","attachment_type": "default"}])
Step Functions デプロイ
serverless.yml
service: サービス名custom:config:accountId: AWSアカウントIDprune:automatic: truenumber: 1region: ${opt:region, self:provider.region}provider:name: awsruntime: python3.8lambdaHashingVersion: 20201221profile: defaultregion: ap-northeast-1stage: prodpackage:exclude:- lib/**functions:cardlist:handler: get_card_from_done.notifytimeout: 300cardhistory:handler: search_history.search_card_historytimeout: 900postslack:handler: post_slack.chat_postMessagetimeout: 300plugins:- serverless-step-functions- serverless-prune-plugin- serverless-python-requirementsstepFunctions:stateMachines:hellostepfunc1:name: outputDoneCardevents:- http:path: createmethod: POSTdefinition:Comment: "output card moved by weekly"StartAt: GetCardFromDoneStates:GetCardFromDone:Type: TaskResource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardlist"InputPath: $ResultPath: $.resultNext: HistorySearchMapHistorySearchMap:Type: MapInputPath: $ItemsPath: $.resultIterator:StartAt: HistorySearchStates:HistorySearch:Type: TaskResource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardhistory"End: TrueResultPath: $.input.post_messageNext: PostSlackPostSlack:Type: TaskResource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-postslack"End: true
InputPathは、入力フィルタとして指定された部分がLambdaに入力されます。
ResultPathは、実行結果の格納先を指定します。
ItemsPathはMapで使用され、その項目を入力として反復に渡します。
$.data のように指定します。 「$」を指定すると、入力データ全体が引き渡されます。
post_messageのデータを指定したい場合は、$.input.post_messageです。 マネジメントコンソールを確認しながら指定していきます。
これで終わりです。 デプロイして実行を開始すると並列処理により、タイムアウトエラーにならずにカードの一覧が投稿されます。
まとめ
コードは以前のものを使ったので、作業としてはこれだけです。とても簡単に実践できました。
もちろんAmazon CloudWatch Eventsを使ってスケジュール実行もできるので、処理に時間のかかるワークフローをStep Functionsを使って並列処理に変えてみてはいかがでしょうか。
最後まで読んでいただき、ありがとうございました。