DataflowとBigQueryで始める大規模データ分析基盤実装入門
アーカイブ動画
データ分析基盤構築における考え方とシステムアーキテクチャ
佐藤 太一氏
株式会社電通国際情報サービス(ISID)
Xイノベーション本部 ソフトウェアデザインセンター
今回登壇した佐藤太一氏が所属する電通国際情報サービス(以下、ISID)のXイノベーション本部は、全社横断的な研究開発部門。佐藤氏はGitHubやJIRAなどの現代的な構成管理ツールの利用促進や部門横断的な技術支援、会社の制度改善などに取り組んでいる。
佐藤氏はバックオフィス系のコーポレート本部システム推進部にも所属しており、今回はシステム推進部の業務として立ち上げた分析基盤の構築から得られた知見が語られた。
まずデータ分析基盤構築において重要なのは、以下に挙げた3つの考え方である。
1.オンプレミスのサーバ上にデータ分析に関する役割をできるだけ残さない
例えば、文字コードをEUC-JPからUTF-8に変換する、LZHや7-Zipで圧縮されているファイルを展開するなど、小さなことでもすべてクラウド上で実施すること。データ量が多くなってくると、ちょっとした処理でも途端に巨大なサーバーが必要になるからである。
2.スケールアウトしやすい処理方式にする
クラウドを利用する際は、冪等性を意識した処理構造にして、同じ入力に対しては何度実行しても同じ結果が得られるようにする「スケールアウト型の処理方式」が適している。スケールアウト型の処理方式はストレージを潤沢に使うので、分割や圧縮で容量を最適化するのではなく、同じデータをさまざまなロケーションにばらまくことが重要となる。
3.データ更新の頻度は日時程度にする
最初は週次ぐらいの想定で形成するのが望ましい。データの更新頻度は、1時間を切るあたりから要素技術の難易度が上がり、複雑な対応が必要になるからだ。
今回、紹介するシステムアーキテクチャは以下図の通り。
「オンプレミスのサーバー群からGoogle Cloud Platform(GCP)にデータを送信する。データを書き込むのはCloud Storage。オンプレミス側で用意するサーバーの役割は、できる限りCloud Storageの書き込みのみに絞り込みましょう」(佐藤氏)
Cloud Storageのバケットにファイルが書き込まれたら、そのCloud FunctionsイベントがCloud Functionsに通知される。Cloud Functionsのアプリケーションは冪等性のあるものにすること。繰り返し同じファイルがCloud Storageに書き込まれても、同じように振る舞うように実装できるからだ。
Cloud Functionsに期待する振る舞いは、Dataflowを実行すること。ジョブが成功しようと失敗しようが、Cloud Storageにファイルが書き込まれると、何度でもDataflowを呼び出すことができる。
なおDataflowは、バッチ処理を実行するためのプラットフォーム。Dataflowの役割はCloud Storageに保存されたファイルを読み取って抽出したり、加工した上でBigQueryのテーブルに書き込むことである。
Dataflowは抽象化された高度な分散処理能力があり、データ量に応じて必要なサーバーを増減してくれる。つまり、スケールアウト型のバッチ処理基盤というわけだ。
「個人的には不要になったサーバーを縮退してくれる点が気に入っています」(佐藤氏)
BigQueryに入れば、SQLを使ってデータ分析をするだけとなる。BigQueryは巨大なデータを効率良く分散処理しながら、SQLの結果を返すスケールアウト型のサービス。クエリ単位の従量課金モデルなので、課金制限をつけた上で利用する方がお勧めだという。
オンプレミスで使えるRDBとクエリの考え方が違うので、最初はコストの表示を意識しながら使うこと。BigQueryではテーブルの全ての内容が必ずしも単一のサーバー内に収まっているわけではないので、unionやexistsを使ったクエリのコストは高くなりやすいからだ。一方で、joinのコストは想定より低くなる傾向がある。
BigQueryに格納されたデータは、Data Portalで可視化できる。BigQueryの結果の画面にData Portalへの連携機能が用意されているので、それを使えばシームレスにグラフを作ることができる。
ここでQ&Aタイムが設けられた。
Q.CloudFunctionsの呼び出し回数が少ないのはなぜか
佐藤:Cloud Functionsが想定するユースケースとは少しずれているためです。例えば、HTTPリクエストをエンドユーザーから受けるサービスと、ファイルがストレージに書き込まれるのをトリガーにCloud Functionsを動かすケースとでは、スケールが異なります。今回のシステムでは、Cloud Storageに書き込まれたファイルの数しか実行されない。結果的に非常に呼び出し回数が少なくなります。
Q.他に候補となったアーキテクチャはあったか
佐藤:AWSやAzure、Splunkを使ったアプローチも検討しましたが、予算や工数の都合で、今回のアーキテクチャを採用しました。
Q.冪等性を確保する手段として採用される方式について
佐藤:冪等性を確保する際は、データベースにレコードが既にあるかを確認するアプローチは取りません。なぜなら、2回目以降のコストが上がってしまうからです。そこでデータのレコードは基本的に追記するようにして、イベントがトリガーされた分だけ、Cloud Functionsが動き、BigQuery側にデータが入っていく形にしています。データは重複しますが、BigQueryのDISTINCT句を使って、重複排除したものをData Portalに流すことで、冪等性を担保しています。
Q.データ分析のワークフローでは、Cloud Functionsで事足りるユースケースが多いのか?
佐藤:Cloud Composerと似ている役割を持っているのがDataflowです。Cloud Composerは便利なサービスですが、今回は私がJavaのコードを書けることもあり、Dataflowを使いました。もちろんCloud Composerを使ったアプローチもあり得ると思います。
オンプレミスからGCSにデータをアップロードする
続いて、システムの詳細が説明された。まず、オンプレミスにあるデータをGoogle Cloud Storage(GCS)にアップロードする方法については、「オンプレミスにあるデータにどれだけの価値があるかわからないうちは、データ転送に大きなコストをかけない方がいい」と佐藤氏。
データ分析基盤は、会社のデータを閲覧できる権限を持つ人だけをファーストユーザーにすることが望ましいと考えた佐藤氏は、会社の執行役員をファーストユーザーに設定した。
このような用途に向いているツールとして紹介されたのが、Goで実装されたマルチプラットフォームで動作するオープンソースのファイル転送ソフトウェア「RCLONE」だ。
特徴は、非常に多くのストレージサービスに対応していること。SFTPやWebDavのような標準化されたプロトコルはもちろん、多くのS3互換サービスに対応している。佐藤氏は、転送先にないファイルだけを選んで転送する機能「rclone copyコマンド」がお気に入りだという。
このコマンドであれば、万一、オンプレミスからGCSにファイル送信完了後に、オンプレミスのファイルを削除し、その後に再度rclone copyコマンドを実行しても、GCSに置かれたファイルは何の影響も受けない。rclone syncコマンドのように、うっかりファイルを消してしまうことなど生じないのだ。
RCLONEを使ったファイル転送手順は、次のようになる。
- GCSにバケットを作り、サービスアカウントとサービスアカウントキーを用意
- rclone configコマンドでGSCへのアクセスをセットアップ
- rclone copyコマンドを定期実行してファイルを転送
Cloud Functionsを使って、Dataflowを実行する方法
次に説明されたのは、Cloud Functionsを使って、Dataflowを実行する方法についてだ。GCSのファイルの書き込みが終わった時、更新イベントを通知する先が、Cloud Functionsとなる。
佐藤氏は「ビルド環境の一貫性を維持するためにJavaで実装したが、どの言語でも手間は変わらないので、自分が慣れている言語で実装するのがお勧め」と補足している。
ビルドスクリプトはGladleを使用して記述する。まずはビルドスクリプトの前半部分。
プラグインとして利用するのは、javaプラグインとshadowプラグイン。記述したJavaのアプリケーションを依存ライブラリと一つのファイルにまとめてくれるプラグインである。
java.toolchainで始まる部分は、ビルドに使うJavaのコンパイラやコアライブラリを、Gladleが一致させてくれる機能。これによってビルドの再現性を高めることができる。
次のコードはこのプロジェクト内においてJavaのコードをコンパイルする際にファイルのエンコーディングをUTF-8とみなすものである。
最後のコードはshadowプラグインのための設定。Javaファイルの中にはMETA-INFディレクトリがあり、それぞれ内容の違うファイルが格納されている。これらはUber jarが動作すると、基本的にトラブルの元となるので一律に排除するようにしている。
後半部分は主に依存ライブラリの宣言を行っている。
まずGladleのプラットフォーム機能を使って、google.cloudのbomを指定する。bomはいくつかのライブラリを束ねたもの。互換性のある組み合わせは無数にあり、その詳細を組み合わせるのは極めて難しい。そのため、Googleが公式に妥当な互換性で動くライブラリの組み合わせをまとめてくれている。それがbomというわけだ。
guavaはJavaの便利なクラスの詰め合わせで、functins-framework-apiはCloud Functions用のフレームワークである。これらの規約に沿って実装することで、Cloud Functionsにデプロイできるクラスになる。
google-cloud-data-flowは、Dataflowを操作するためのクライアントライブラリだ。Dataflowのクライアントライブラリは他にもあるが、きちんとメンテナンスされているのはこれだけだという。
Google-auth-library-paitj2-httpはその名の通り、GCP上で動作するOAuth用のライブラリ。これがあることで、Dataflowを呼び出すコード内において、明示的に設定しなくても正しく認証情報がCloud FunctionsとDataflowの間で連携される。
残り2つは言語のライブラリ。stf4jはインターフェース部分が定義されているライブラリで、logbackはその実装の一つだ。
Cloud Functionsで動作するアプリケーションの実装とデプロイ
いよいよJavaのコード実装に説明が移る。まずはCloud Storageのイベントを受け取るために、JavaBeans規約に基づいた2つのプロパティがあるクラスを定義する。
次にCloud Functionsの本体部分は、Cloud Functions上で動作するアプリケーションを作るには、BackgroundFunctionインターフェースを実装する。
acceptメソッドでは、Dataflowのクライアントライブラリを使ってジョブを実行。処理の起点となるクラスが、FlexTemplatesServiceClientだ。システムリソースを消費するタイプのクラスなので、try-with-resources構文で、処理が終了した後にはリソースが開放されるように実装する。
これを使うために3つのクラスをセットアップ。最初にFlexTemplateRuntimeEnviromentを作る。これはDataflowのジョブが動く環境について指示を出すものだ。
次にDataflowを実行する上で必要な書き込み領域とジョブを実行するサービスアカウント、ジョブが実行されるゾーンを指定する。ちなみに、asia-norheaset1-cは東京を意味する。
動く環境ができたら、Dataflowに渡すパラメータであるLaunchFlexTemplateParameterを構築する。FlexTemplatesは、簡単に言うとジョブに引き渡すパラメータが定義されたJSONファイルである。
ここではジョブ名としてtechplay-jobという固定値にしているが、「この部分は非常に工夫のしがいがあるところ」と佐藤氏は説明する。佐藤氏は、ジョブ名の末尾にインプットファイルのハッシュ値をつけることで、同じファイルのジョブが複数回起動しているかどうかを調べるようにしているという。
入力パラメータを構成後、LaunchFlexTemplateRequestの構築を行う。既に構成したパラメータオブジェクトを設定した上で、プロジェクトIDとリージョンを設定する。最後にLaunchFlexTemplateRequestオブジェクトを渡せば、Dataflowのジョブが実行できる。
これでアプリケーションの実装は完了。あとはデプロイをするだけである。
デプロイはGoogle Cloud SDKを使うこともできるが、渡すパラメータが非常に多く、ビルドプロセスの一貫性を維持するために、Gladleのタスクを定義している。具体的にはdeployタスクは依存するタスクとしてshadowjarを指定し、デプロイを実施する前にビルドしている。
コマンドラインオプションの構築については、いくつか注意することがある。まずはprojectオプションとtorigger-resourceオプションは、自分で作成したモノを指定すること。次にenv-vars-fileオプションでは、Yamlファイルを指定する。Cloud Functions内で参照できる環境変数を定義するためである。sourceオプションではshadowタスクに構成されている出力ファイルの絶対パスを指定する。これがGladleにデプロイタスクを定義するメリットだ。
OSのコマンドを起動する部分についても細かな調整をしていると、佐藤氏は話す。
「最下段に書いてあるようにbatファイルを別途用意。Windows上でビルドスクリプトを実行する際に使うようにしました」(佐藤氏)
GCSにファイルが書き込まれた際に、通知されるイベントを受信するCloud Functionsの実装手順をまとめると、次のようになる。
- gradle initで新規プロジェクトを作りビルドスクリプトを編集
- GCSのイベントを受信するためのCloud Functionsアプリケーションから、Dataflowのジョブを起動
- ビルドスクリプトの一部として、デプロイ実行タスクを定義
Dataflowによるバッチジョブの実装方法
続いて、Dataflowによるバッチジョブの実装方法が説明された。Cloud Functionsから呼び出されたDataflowのジョブが、Cloud Storageからデータを読み取り、抽出や加工されてBigQueryに書き込まれる。
Dataflowとは、オープンソースソフトウェアであるApache BeamをGCP上でフルマネージドサービスとして提供しているサービスである。Apache Beamは、バッチ処理やストリーム処理を記述するためのフレームワーク。Javaで実装されているが、GoやPython、SQLなどの言語でも処理を記述することができる。
Javaで実装するため、WindowsやMac、Linuxなど様々なOSの上で動作する。加えてオンプレミス上で動作させる際には、FlinkやSamza、Spark、Hazelcastなどの分散処理環境上で、Beam用に記述したアプリケーションをデプロイできる。
極めてポータビリティ性が高いことも特徴の一つである。そしてApache Beamの実行環境の中で高い利便性を持つのが、GCP上で動作するDataflowである。
「Dataflowは費用を抑えながらも安定的に処理をすることができる。個人的にはワーカーノードの縮退が迅速に行われるところに感動しました」(佐藤氏)
Beamの実行基盤としてGCPを使うメリットは、高速なスケールアウトとスケールインができること。そしてCloud Loggingが使えること。GCSやBigQueryとのシームレスな連携ができることである。
また、Beamを使ったコードでは、PipelineとPCollectionのコンセプトを理解する必要がある。Pipelineとは、複数のステップから構成される処理全体を表すオブジェクト。PCollectionは、Pipelineを流れるデータの集合を表すオブジェクトである。
Dataflowにおけるビルドスクリプトについては、前半部分はCloud Functionsの実装した内容とほぼ同じ。Javaのバージョンで11を使用しているのは、Dataflowが公式にサポートしているDockerベースイメージがJava11だからである。
BeamのSDKを利用するには、3つのライブラリに依存する形になる。1つ目がBeamのcoreライブラリ。「バッチ処理を記述する上で理解すべきことは、すべてこの中に入っています」と佐藤氏。
2つ目以降は、BeamをGCP上で動作させるために必要なライブラリとなる。「これらについてはおまじない程度に理解しておけば大丈夫です」(佐藤氏)
次は、Loggingライブラリに関するセットアップ。Dataflowのワーカーノードでは、JavaのCoreAPIを使って出力されたものをCloud Loggingに転送する。slf4jの実装としては、java.util.loggingしか使えない。CSVのライブラリはCommons CSVを使用している。
入力パラメータを受け取るインターフェースを定義したら、同じ内容のメタデータを定義したJSONファイルを用意。配列の各要素にはネームとラベル、ヘルプテキストを定義する。特にネームはJavaでインターフェースに定義したプロパティ名と一致している必要があるので、要注意ポイントだ。
ここまで準備ができたら、Pipelineを定義していく。このPipelineではGCSに配置されたCSVファイルを読み込み、それを1行ずつ分割した後にBigQueryのレコードとして投入するという処理を実行している。
「たったこれだけのコードで、何十万ものCSVファイルがあろうが、問題なくBigQueryに投入できます」(佐藤氏)
Dataflowで動作するアプリケーションが実装できたら、最後はデプロイを行う。Cloud Functionsで説明した話と似ているが、sdk-lanuageオプションとflex-template-base-imageオプションは、一貫性のある指定をすることがポイントだ。今回はCommons Java11を指定している。
deployタスクでは、ローカルでビルド後に自動生成されたDockerファイルとビルドがcloudbuildに送信される。Dockerコンテナがビルドされた後に、image-gcr-pathオプションに指定した場所にDockerコンテナイメージがデプロイされる。
envオプションで指定しているのは、Dockerコンテナが実行される際に渡される環境変数。ここではジョブを定義したクラスのFQDNを指定している。
下記画面はDataflowでジョブを動かしたときのものだ。ジョブに関する詳細が容易に把握できる。
大規模データ分析基盤を構築するために必要なコストとは
最後は、コスト観について。Cloud Storageは10TBの容量のスタンダードストレージを採用しており、現在使用している容量は2TB~3TB。月額は109ドルである。
「約4~5年分のログを格納できている。来年以降も急増することはないので十分」と佐藤氏。つまりISIDの規模であれば、100ドル前後で毎月使えるということだ。
Cloud Functionsの使用料は1ドル未満。DataflowとCompute Engineは月額252ドル。内訳としてはDataflowが約10ドル、仮想マシンが約240ドルだ。
「パイプラインの作りをもう少し工夫すればコストを低減できるが、現時点では気にする金額ではない」(佐藤氏)
BigQueryは月額2010ドル。他のサービスに比べると高額に見えるが、これはオンプレ上で実施していたいくつかの業務をGCP上に移行したためである。現在、4~5人の運用担当者が既存業務をBigQueryで実現できるか検証しており、そのクエリに制限をかけていないからだという。運用が確定すればクエリの頻度が下がるので、コストは低減できる。
最後に佐藤氏は次のようなアドバイスを語り、セッションを締めた。
「クラウドではストレージを潤沢に使い、スケールアウト型のシステムを作ることをお勧めします。GCPで安価にデータ分析基盤を作りましょう」(佐藤氏)
最後まで質問が絶えなかったQ&Aタイム
Q&Aタイムでは多くの質問が寄せられた。いくつか抜粋して紹介する。
Q.初心者は何から始めたらいいか
佐藤:まずは、Dataflowのチュートリアルを触ってみることをお勧めします。そして、本日説明した内容をある程度抑えた上で、GCPにプロジェクトを作り、実際に動かしてみることです。
Q.BigQueryへのクエリで余計なデータを操作しないために行っていることは?
佐藤:テーブルを定義するときに、パーティショニングを行うこと。全てのカラムにタイムスタンプを入れ、これを基準にパーティショニングしています。タイムスタンプをWHERE句に入れなければ、SQLが発行できない状態になります。もう一つはコストリミットをかけることです。
Q.各種ライブラリ・開発言語のUpgradeなど、技術負債化させない施策は?
佐藤:ライブラリのアップデートはCIサーバーのGitHub Actionの上で実施しているので、定期的にバージョンアップされます。ライブラリのバージョンという意味では技術負債は於きづらい状態になっています。Continues integrationとContinues deployによって、リスクを低減する措置も行っています。
Q.分析前のクレンジングや前処理もDataflowで行っているのか?
佐藤:データ発生源側でのデータ品質は全く担保できていません。分析前のクレンジング前処理はDataflowで行っていますが、それはフォーマットに関わる部分です。それを行った上でBigQueryにデータを投入し、SQLでできる範囲のデータクレンジングを定期的にBigQuery側で一次データを格納するテーブルと分析用テーブルを明確に分けて行っています。分析用テーブルにはできる限りクリーンなデータが入るようにしています。