はじめに
会員システムグループのkiqkiqです。最近PySparkというライブラリを触ってみたので紹介したいと思います。
Apache Spark・PySparkとは
PySparkは、Pythonを使ってApache Sparkを操作するためのライブラリです。そのApache Sparkというのは、オープンソースの大規模データ処理フレームワークで、高速で汎用的なデータ処理エンジンです。Sparkには主に4つの特徴があります。
- 分散処理 Sparkはクラスター上で分散処理を行うことができ、大量のデータを効率的に処理することができる
- 高速 メモリ内で処理を行い、複数の並列操作でジョブのステップ数を減らすことでデータを再利用することができる
- 汎用性 Sparkはバッチ処理だけではなく、ストリーミング処理、機械学習、グラフ処理などのデータ処理に対応
- 多様なAPI Sparkは、Scala、Java、Python、R など、さまざまなプログラミング言語からアクセスできる
これらの特徴から、Spark関連のものは大規模データ処理、機械学習、ストリーミング処理など、さまざまな用途で利用されています。このブログではPySparkの基本的な機能や簡単な使い方などを紹介していきます。
機能紹介
- Spark SQL
- Spark SQL は、SQLライクなクエリ言語を使ってデータ処理ができる。 SQLを使ったデータ分析や 抽出、変換、ロードのようなタスクを簡単に実行できるもの。
- Spark Streaming
- Spark Streamingは、リアルタイムのデータストリーミング処理を可能にするコンポーネント。Spark Streamingでは数秒ほどの短い間隔に区切られたバッチ処理を繰り返し行い、ストリームデータ処理機能を実現されている。
- MLlib
- MLlibは、Sparkに組み込まれている機械学習のライブラリで、分類、回帰、クラスタリング、共起分析などの主要な機械学習アルゴリズムが用意されている。
- GraphX
- GraphXは、グラフ処理のための Spark のコンポーネントで、ソーシャルネットワーク分析、推薦システム、ルーティングアルゴリズムなど、グラフ構造のデータを扱うことに特化した機能。また、組み込まれているグラフ処理用の API を使って、グラフの作成、変換、分析などの処理を行える。
使い方
環境構築
今回はjupyter notebook
とpyspark
がインストールされたコンテナイメージjupyter/pyspark-notebook
上で実行していきます。以下のdocker-compose.yml
を実行するだけでpyspark
を実行できる環境を構築できます。
また、この記事では気象庁が公開している神戸市の気象データを使用します。https://www.data.jma.go.jp/gmd/risk/obsdl/
実行
PySparkのDataFrameを使用しSpark SQLの使い方を紹介します。
SparkSessionの作成とPySparkのDataFrameへの変換は以下のように行います (また、SparkSessionの作成時にワーカーノード上のExecutorの設定もできる)
PySparkのDataFrameはpandasと同じような操作ができます
PySparkのDataFrameに対してSpark SQLを用いてSQLの操作を行うことができます (普通のselect3件)
このように普通のSQLの実行結果のように出力することができます。
結果の確認
Spark UIの「SQL / DataFrame」タブで実行情報を確認できます

また、SparkSession作成時に設定したExecutor(ワーカーノード上で実行されるプロセス)などの確認はEnvironmentタブやExecutorsタブで確認することができます。
ブログ内で使用した程度のデータの規模だとSparkを使用する意味はほとんどありませんが、大きいデータになれば分散処理による工夫が必要ななるため、Sparkなどが有効になると思います。
おわりに
PySparkを使用すればPandasやSQLを使うように簡単に分散処理を実行することができるので、分散処理に興味がある方や大規模なデータ分析をする方は試してみてください。