KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

まとまったデータを小分けにして一定間隔で順番に並列リクエストする with Typescript

こんにちは☃️
プラットフォームチームの石黒です。

こちらは株式会社カケハシ x TypeScriptアドベントカレンダー2021 13日目の記事です。

今回は、Typescriptでの直列リクエストと並列リクエストを組み合わせた実装についてお話しします。

要件

  • データの一括処理をしたい
  • データ量はリクエストごとに異なる
  • 処理するシステムは1件ずつしか受け付けられない
  • 処理するシステムのクォータは50件/秒とする

実装方針

データ量が一定でないので、クォータの50件を超過しても問題ないように実装したいところです。
処理システムは1件ずつしか受け付けられないですが、1秒の間隔を空ければ最大50件は並列でリクエストを送信できそうです。
ということで、データの分割をするだけでなく、間隔を空けて順番にリクエストする方法を考えていきます。

下準備

入出力値の型定義

新年の計は型定義にありということで、入出力の型をそれぞれ定義します。
入力値はシンプルにidだけ、レスポンスは入力値に結果を付与したものとします。

type SampleRequest = {
  id: number
}

type SampleResponse = SampleRequest & {
  result: boolean
}

データの分割

リクエストデータを50件ずつの塊に分割します。こちらは、You-Dont-Need-Lodash-Underscoreをもとに関数を用意しました。

const chunk = <T>(seq: T[], size: number) => seq.reduce((acc, x, index) => (index % size === 0
  ? [...acc, [x]]
  : [...acc.slice(0, -1), [...acc.slice(-1)[0], x]]), [] as T[][])

間隔を空ける

塊にしたデータを一定間隔を空けてリクエストするために、sleep関数を用意します。

// eslint-disable-next-line no-promise-executor-return
const sleep = (msec: number) => new Promise(resolve => setTimeout(resolve, msec))

リクエスト処理

ひとつの塊の中で1件ずつリクエストを行いたいので、Promise.allを使いクォータぎりぎりまで並列リクエストできるようにしておきます。

const sendRequest = (request: SampleRequest) => request.id % 5 === 0 ? Promise.resolve('ok') : Promise.reject(new Error('ng'))

const bulkSendRequest = (requests: SampleRequest[]): Promise<SampleResponse[]> => {
  logger.info('bulk send request')
  return Promise.all(requests.map(req => {
    logger.info(`id: ${req.id}`)
    return sendRequest(req)
      .then(() => ({...req, result: true}))
      .catch(() => ({...req, result: false}))
  }))
}

やってみる

準備していた関数を組み合わせて、入力データを分割、一定間隔で並列リクエストしてみましょう。
今回の場合、chunk関数で分割されたデータ currentChunk は入力データなので SampleRequest[] 型ですが、 bulkSendRequest() にてリクエストを送った結果は Promise<SampleResponse[]> となるので、明示的に previous の型宣言を行う必要があります。
さらに、reduceの第2引数であるinitialValueには初期値として空配列を渡したいところですが、 previous の初期値なのでPromise化して Promise.resolve([]) としました。
個人的にここで正しく型推論されずエラーとなり苦しんだのですが、コンパイラにきちんと型を教えてあげることが大事なのですね。。
型があることで、データがどのような移り変わりをするのかコードを見るだけで把握できるのはとてもありがたいです。

export const callBulkSendRequestInSequence = (
  requests: SampleRequest[],
  concurrency = 50,
  interval = 1000
) => chunk(requests, concurrency).reduce(
  async (previous: Promise<SampleResponse[]>, currentChunk: SampleRequest[], idx) => {
    const tempPrev = await previous
    logger.info(`start chunk ${idx + 1}`)
    const currentResults = await bulkSendRequest(currentChunk)
    await sleep(interval)
    return [...tempPrev, ...currentResults]
  }
  , Promise.resolve([])
)

それでは、実際に実行してみます。
今回は、検証のため同時リクエスト数を少なく(3件)、間隔を広く(2秒)設定してみたいと思います。
(分かりやすくするためタイムスタンプを表示したく、loggerとconsoleのログがコード上に混ざっています🙇‍♂️)

const testData = [
  {id: 1},
  {id: 2},
  {id: 3},
  {id: 4},
  {id: 5},
  {id: 6},
  {id: 7},
  {id: 8},
  {id: 9},
  {id: 10},
]

callBulkSendRequestInSequence(testData, 3, 2000).then(values => console.log(values))
// 実行ログ
2021-12-09 19:20:42 INFO system start chunk 1
2021-12-09 19:20:42 INFO system bulk send request
2021-12-09 19:20:42 INFO system id: 1
2021-12-09 19:20:42 INFO system id: 2
2021-12-09 19:20:42 INFO system id: 3
2021-12-09 19:20:44 INFO system start chunk 2
2021-12-09 19:20:44 INFO system bulk send request
2021-12-09 19:20:44 INFO system id: 4
2021-12-09 19:20:44 INFO system id: 5
2021-12-09 19:20:44 INFO system id: 6
2021-12-09 19:20:46 INFO system start chunk 3
2021-12-09 19:20:46 INFO system bulk send request
2021-12-09 19:20:46 INFO system id: 7
2021-12-09 19:20:46 INFO system id: 8
2021-12-09 19:20:46 INFO system id: 9
2021-12-09 19:20:48 INFO system start chunk 4
2021-12-09 19:20:48 INFO system bulk send request
2021-12-09 19:20:48 INFO system id: 10

// 実行結果
[
  { id: 1, result: false },
  { id: 2, result: false },
  { id: 3, result: false },
  { id: 4, result: false },
  { id: 5, result: true },
  { id: 6, result: false },
  { id: 7, result: false },
  { id: 8, result: false },
  { id: 9, result: false },
  { id: 10, result: true }
]

ログから、2秒毎、3件ずつ実行されていることが分かります。
また、今回は5の剰余が0のときのみtrueを返すようにしたので、結果をみるとidが5と10のときのみtrueとなっています。

まとめ

入力データを分割し、一定間隔でリクエストする実装の紹介をしました。
当初はライブラリを使うことを検討していたのですが、微妙にユースケースが合わず、自前実装してみました。
サンプルコードではクォータぎりぎりを攻めていますが笑、実際には、複数ユーザーから実行される可能性を考慮し並列数を下げたり、インターバルにゆとりをもたせたりと値の設定は要検討です。