こんにちは!トラーナのエンジニアの @m3m0r7 です。 弊社では Laravel 6 を利用していて、重たい処理は Swoole のキューと Amazon SQS と分けて処理するようにしています。
キューの課題
Laravel の queue:listen
を使用していたのですが、Amazon SQS をキュードライバとして使用している場合、キューが 1 つずつしか処理できず、systemd
を用いて複数起動するようにしてはいたものの、パフォーマンスに課題がありました。
queue:listen
は内部的には queue:work
を呼んでいるようで、いわゆる supervisor のような存在なのですが、この queue:work
の数を増やす方法が提供されていなようです。
ちなみに queue:work
は一度キューを拾うと、実行が終了するような仕組みです。
Laravel の Illuminate\Queue\Console\ListenCommand
は以下のようなパラメータしか引き受けていないことがわかります。
<?php protected $signature = 'queue:listen {connection? : The name of connection} {--delay=0 : The number of seconds to delay failed jobs} {--force : Force the worker to run even in maintenance mode} {--memory=128 : The memory limit in megabytes} {--queue= : The queue to listen on} {--sleep=3 : Number of seconds to sleep when no job is available} {--timeout=60 : The number of seconds a child process can run} {--tries=1 : Number of times to attempt a job before logging it failed}';
Laravel Horizon というものもあるのですが残念ながら Amazon SQS には対応しておらず Redis のみの対応となっています。
また、他にもパフォーマンスにも課題がありました。社内システムの Elasticsearch への書き込みが情報量のある複数のインデックスを更新するため、とても重く(1 回あたり 10 秒程度かかる)、同期的に待っているとレスポンスのパフォーマンスが悪くなってしまうので、弊社ではキューを使って課題の解決をしています。
最終的に 複数の queue:work
プロセスを並行並列管理するような仕組みを自作することにしました。
プロセスの破棄と生成
PHP には多重化するために stream_select と呼ばれる関数と、プロセスの双方向通信を実現する proc_open というのがあるので、これを利用して実装をしました。
proc_open で queue:work
のプロセスを指定したプロセス数だけ起動し queue:work
が終了したら、再度同様にプロセスを立ち上げるようにしています。
<?php $processorSize = (int) $this->option('processors'); // ... for ($i = 0; $i < $processorSize; ++$i) { // プロセスが実行済み(もしくは死んでいる)か判定 if (!$this->isAlive($i)) { // 念の為終了処理 $this->closeProcess($i); // プロセスの作成 $this->createProcess($i); } }
isAlive
は書き込み用のストリーム(stdout)、エラー用のストリーム(stderr)、プロセス本体のリソースのいずれかが動作終了もしくは EOF に到達している場合は false
を返すようになっています。
<?php protected function isAlive(int $index): bool { if (!isset($this->processors[$index])) { return false; } [$resource, $out, $err] = $this->processors[$index]; $isResourcesAll = is_resource($out) && is_resource($err) && is_resource($resource); // いずれかがリソースではなくなっているものがある場合 if (!$isResourcesAll) { return false; } $outMeta = stream_get_meta_data($out); $errMeta = stream_get_meta_data($err); $resourceMeta = proc_get_status($resource); return $outMeta['eof'] === false && $errMeta['eof'] === false && $resourceMeta['running'] === true; }
そして closeProcess
でプロセスやストリームの終了をさせます。すでに終了しているプロセスやストリームもあるので、終了できるかどうか分岐してあげる必要があります。
<?php protected function closeProcess(int $index): void { if (!isset($this->processors[$index])) { return; } [$resource, $out, $err] = $this->processors[$index]; if (is_resource($out)) { fclose($out); } if (is_resource($err)) { fclose($err); } if (is_resource($resource)) { proc_close($resource); } unset($this->processors[$index]); }
次に実際にプロセスを起動させるところを記述します。
<?php protected function createProcess(int $index): void { $resource = proc_open( // queue:work を実行するためのコマンド [ 'php', 'artisan', 'queue:work', 'sqs', '--queue=' . config('queue.connections.sqs.queue'), '--memory=256', ], [ // stdout 1 => ['pipe', 'w'], // stderr 2 => ['pipe', 'w'], ], $pipes, getcwd(), // (1) array_merge( $_ENV, [ 'CACHE_DRIVER' => 'array', ], ) ); // それぞれノンブロッキングモードにします。 stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); $this->processors[$index] = [$resource, $pipes[1], $pipes[2]]; }
(1) … 弊社では Laravel から Redis への接続のためのドライバそのものを Swoole に対応させるために自作しています。その際に $_ENV['CACHE_DRIVER']
に custom-redis-driver
のように値が入ってきます。 proc_open
は 第 5 パラメータを省略した場合親プロセスの$_ENV
をそのまま引き継ぐ仕様になっています。
しかし、Swoole 対応のドライバはコルーチン内もしくはスケジューラ内でしか動作しないため queue:work
にそのまま渡すと、プロセスが一瞬で終了してしまう問題がありました。
Illuminate\Queue\Worker
の getTimestampOfLastQueueRestart()
で $this->cache->get(...)
と呼び出しているところがあるのですが、ここが正常に実行できていないためです。これは queue:work
起動時に呼ばれるdaemon($connectionName, $queue, WorkerOptions $options)
内で呼び出されています。
Redis にわざわざ繋いで保存する必要もないので array
で渡してあげるようにして解決しました。
本当は [...$_ENV, 'CACHE_DRIVER' => 'array']
と書きたかったのですが、この記法は PHP8.1 からなので、未だ使えず。
なお、弊社のチームリードが最近 PHP8.0 へアップグレードしてくれました!
多重化の対応
そして、次に多重化の対応です。複数のプロセスを createProcess
で起動したものを stream_select
で取りまとめて処理をします。
実は Symfony に多重化に対応したものとして Symfony\Component\Process\Process があるのですが、
PHP8.0 への移行の際、Swoole との相性が悪いことが判明しており、あえて stream_select
で実装する方針としました。
stream_select は要約すると監視対象のストリームに書き込み等があれば、参照渡しとしている引数を変更があったものだけに絞ってくれる、といったものです。つまりどのストリームに書き込みがあったのか、ウォッチすることができるので、多重化の際には重宝します。 stream_select 自体は変更のあったストリームの数を返り値として返してくれます。
そこで createProcess
で使っている proc_open
から生えた stdout
, stderr
を監視するようにすればいいのです。
<?php $outs = array_column( $this->processors, 1 ); $errs = array_column( $this->processors, 2 ); $changes = stream_select( $outs, $in, $errs, 0, 200000 );
上記のようにすると $outs
または $errs
に変更があった時に $changes
が 1
以上になり、何かしらの処理を行うことできるようになります。
また $changes
はエラー発生時やタイムアウト時に 0
や false
を返すこともあるので、その場合は、スキップするようにしてあげればよいです。
<?php foreach ($outs as $out) { $index = $this->getIndexByResource( $out, 1 ); if ($index === null) { continue; } $this->processOut($out, $index); } foreach ($errs as $out) { $index = $this->getIndexByResource( $out, 2 ); if ($index === null) { continue; } $this->processErr($out, $index); }
processOut
と processErr
はそれぞれ、以下のようになっています。
<?php protected function processOut($resource, int $index): void { [$pid, $message] = $this->process($resource, $index); if ($message === '') { return; } $this->getOutput()->writeln( "[PID: {$pid}] {$message}" ); } protected function processErr($resource, int $index): void { [$pid, $message] = $this->process($resource, $index); if ($message === '') { return; } $this->getOutput()->writeln( "[PID: {$pid}] {$message}" ); \Log::error($message); $this->closeProcess($index); }
isAlive
から、stream_select
、出力までの上記の処理をひっくるめて、イベントループ(while(true) { ... }
)にしてあげると、常にプロセスの生死を見つつ、多重化の実装ができます。
どれくらい早くなったのか
キューを処理しているサーバーのプロダクションが c5.xlarge
で vCPU が 4
、ステージングが t3.large
で vCPU 2
なのですが、ステージングで本番の倍以上のキューの処理数を実現しているため 4 倍ほどとなりました。
本当はプロセッサアフィニティ等も考えられると、より効率の良い処理になるのですが、まぁそこまでやる必要も一旦ないかなと…。
このような、取り組みをしている弊社に興味があればぜひカジュアル面談からいかがでしょうか!