こんにちは、VASILYエンジニアの塩崎です。
今回はiQONを支えているクローラーの並列処理について紹介したいと思います。
並列処理の効率化をする過程でresqueを見限りsidekiqに移行した理由、移行時に書き換えた部分などについてもお話ししたいと思います。
iQONのクローラーの並列処理の仕組み
iQONでは毎日数100万点のアイテムのクローリングを行っています。
一度クローリングしたアイテムも毎日1回再クロールし在庫や値下げをiQON内のDBと同期しているため、大量のアイテムを効率良くクロールする仕組みが必要です。
クロールの処理は「ダウンロード」と「スクレイピング」の2つに大きく分けることができます。
このうち、ダウンロードの部分については、提携ECサイトに対して負荷がかからないようにスケジューリングをする必要があります。
この部分の効率化についてVASILYでは分散MutexとResqueを使った並列処理システムを使っています。
ResqueでECサイトごと個別のキューを作り、キューからジョブを取り出し処理をするワーカーもサイトごと個別に作ります。
そして、ダウンロードワーカーは処理を行う前にmutexのロックを行うことによって、1つのECサイトに対して同時に大量のGETリクエストを投げることを防ぎます。
これによって、ECサイト毎に一定の速度でリクエストを発行することができます。
この機能を実現するためにRubyのメタプログラミングを活用し、クラスの動的な生成を行っています。
ダウンロードワーカーは複数のサーバーに分散しているため、このmutexはredisで管理しています。
この辺りの詳細については、以下の記事に詳しく書いてもあります。
Redis::DistMutex - 時限付き分散ロックで効率良くサイトクロールをしよう
Resqueで複数サイトにまたがるクローリングを最適化しよう
ECサイトが増えた時に問題発生
しかし、クロールするべきECサイトが増えた時に問題が発生しました。
前回の記事でもお伝えしましたが、クローリングするECサイトの数を80から400に増やすという施作を実施しました。
その結果クロールする速度が全然スケールせず、サーバーを増やしても増やしてもクロールしきれないという問題が発生しました。
ECサイト数の増加に合わせてサーバー数も5倍に増加させてみましたが、全くクロールしきれませんでした。
ボトルネックの計測
問題解決のためには、ボトルネックの計測をしなければいけません。
以下のような実行時間測定用のメソッドを作成し、怪しそうな部分全部にこのメソッドを適用しました。
ログ集計用のバックエンドにはfluentdとBigQueryを利用しているためデータのスループットについては特に考えずに、少しでも怪しそうな部分全部で実行時間の測定を行いました。
require 'fluent-logger' | |
class Logger | |
def self.measure_execution_time(type) | |
before_time = Time.now | |
res = yield | |
after_time = Time.now | |
elapsed_time = after_time - before_time | |
data = { | |
type: type, | |
elapsed_time: elapsed_time, | |
create_time: Time.now, | |
} | |
Fluent::Logger.post('crawler.execution_time', data) | |
res | |
end | |
end |
Logger.measure_execution_time(1) do | |
# 怪しそうな処理1 | |
end | |
Logger.measure_execution_time(2) do | |
# 怪しそうな処理2 | |
end | |
Logger.measure_execution_time(3) do | |
# 怪しそうな処理3 | |
end |
Resqueue内部でRedisからジョブをデキューしている部分でほとんどの時間を費やしていることが分かりました。
Resque内のボトルネック
Resqueのソースコードを見てみると、デキューをする時にキューを1つ1つ順番にチェックしていました。
# lib/resque/worker.rb:262 | |
queues.each do |queue| | |
log! "Checking #{queue}" | |
if job = Resque.reserve(queue) # 最終的にはLPOP(http://redis.io/commands/lpop)が呼ばれる | |
log! "Found job on #{queue}" | |
return job | |
end | |
end |
Redisには複数のリストから1つのデータを取得するためのAPIであるBLPOPがあるため、このループの実装はムダです。
iQONのクローラーではクロールするECサイトの数だけ動的にキューが作られるので、キューの数も80から400に増えました。
そのため、この部分が大きなボトルネックになっている可能性があります。
sidekiq移行
というわけで、Resqueはキューの数が増えた時にスケールしないのでsidekiqに移行することにしました。
移行にあたっては以下のことを重要視しました。
- ・キューの数が多い時にスケールするかどうか
- ・Resqueからの移行が簡単か
- ・キューやワーカーを動的に生成することができるかどうか
開発環境でこれらのことを検証した結果、sidekiqが最有力候補に残りました。
ソースコードを読んでみても、BRPOPを使ってdequeueをしていることが確認できましたので、キューが増えた時でも効率的にdequeueしていることがわかります。
# lib/sidekiq/fetch.rb:101 | |
def retrieve_work | |
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } | |
UnitOfWork.new(*work) if work | |
end |
また、sidekiq移行をすることで以下のようなメリットも期待できました。
- ・並列化がプロセスレベルではなくスレッドレベルであるため、forkのオーバーヘッドがない
- ・キューからジョブを取得するときに、キューのpriorityを毎回shuffleできる
- ・web UIがキレイ
最終的には公式 に書かれていた
What if 1 Sidekiq process could do the job of 20 Resque or DelayedJob processes?
という挑発的な文言に煽られて導入を決意しました。
移行にあたって
Resqueからsidekiqへの移行はhttps://github.com/mperham/sidekiq/wiki/Resque-Compatibility を参考にしました。
ワーカー、キューの動的な生成
基本的にはほとんどいじる部分がなさそうですが、iQONのクローラーはECサイトごとにキューやワーカーを動的に生成しているため、その部分を考慮する必要があります。
以下のようにしてECサイトごとのワーカー、キューを動的に生成しています。
module Crawler | |
class DomainSpecificWorker | |
include Sidekiq::Worker | |
def self.domainify(domain_id) | |
klass_name = "#{self.name.split('::').last}_#{domain_id}" | |
unless Crawler.const_defined?(klass_name) | |
Crawler.const_set(klass_name.to_sym, self.clone) | |
klass = Crawler.const_get(klass_name.to_sym) | |
queue = klass.class_variable_get(:@@queue).to_s + "-#{domain_id}" | |
klass.sidekiq_options(queue: queue.to_sym) | |
klass.class_variable_set(:@@queue, queue) | |
end | |
Crawler.const_get(klass_name.to_sym) | |
end | |
end | |
class HogeWorker < DomainSpecificWorker | |
@@queue = :hoge | |
def perform(pamram) | |
# do something | |
end | |
end | |
end |
エンキューするときには次のようにします。
Sidekiq::Client.enqueue(HogeWorker.domainify(1), param1) | |
Sidekiq::Client.enqueue(HogeWorker.domainify(2), param2) | |
Sidekiq::Client.enqueue(HogeWorker.domainify(3), param3) |
このときにhoge-1, hoge-2, hoge-3という3つのキューが動的に生成され、それぞれに対して1つずつジョブがエンキューされます。 また、エンキューされたジョブを処理するためのワーカーは以下のようにして起動します。
キュー名の後ろに",1"をつけることによってdequeueをするときのpriorityをランダムにすることができます。
これを指定しないとdomain_idの小さいキューが毎回最優先でdequeueされ、分散mutexの競合が頻繁に発生してしまいます。
domain_ids = [1, 2, 3] | |
option = domain_ids.map{|domain_id| "-q hoge-#{domain_id},1"}.join(' ') | |
cli = Sidekiq::CLI.instance | |
cli.parse(option.split(' ')) | |
cli.run |
移行した結果
resqueを捨て、sidekiqに移行したことによってRedisからのdequeueのオーバーヘッドが大幅に減り、クロール速度が大幅に向上しました。
また、forkのオーバーヘッドが減ることによって1サーバーあたりのワーカー数を4から18に増やすこともできました。
そのほかにも諸々の改修を行うことによって、クローラー用サーバー管理費を1/10にすることも達成しました。
後日談
Resqueの最新版ではBLPOPを使用しているみたいでした。
ですが、最新版でもforkのオーバーヘッドは依然として存在しているので、やはりsidekiq移行はしてよかったと思います。
まとめ
クロールするECサイトが増えるに従って、Resqueではスケールしないような状況になってしまいました。
そのため、dequeueが効率化されているsidekiqを導入しました。
結果としてクロール速度の大幅な向上とサーバー管理費を1/10にすることを達成しました。
最後に
VASILYでは一緒に働くことができる優秀なエンジニアを募集しています。
世界一のファッションデータベースを作ることに興味のある方は是非ご応募ください。