CUBE SUGAR CONTAINER

技術系のこと書きます。

PySpark の DataFrame を SparkSQL で操作する

Apache Spark には SQL の実行エンジンが組み込まれていて、そのインターフェースは SparkSQL と呼ばれている。 この機能を使うと Spark で主に扱われるデータ構造の DataFrame オブジェクトを SQL で操作できる。 今回は PySpark から DataFrame を SparkSQL で操作する方法について書いてみる。

使った環境は次の通り。 Spark は YARN の上で動作するように環境構築してある。 ただし、今回扱う範囲であれば別にスタンドアロンな環境でも動くはず。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar
$ spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch 
Compiled by user felixcheung on 2017-11-24T23:19:45Z
Revision 
Url 
Type --help for more information.

まずは PySpark のシェルを起動しておこう。

$ pyspark --master yarn

サンプルの DataFrame を用意する

まずは操作対象のサンプルとなる DataFrmae を用意する。 ここでは RDD から作ることにした。 中身はユーザの名前と年齢が記録されたものになる。

>>> rdd = sc.parallelize([
...   ('Alice', 20),
...   ('Bob', 25),
...   ('Carol', 30),
...   ('Daniel', 30),
... ])

これで RDD ができた。

>>> rdd.collect()
[('Alice', 20), ('Bob', 25), ('Carol', 30), ('Daniel', 30)]

DataFrame には RDD と違ってスキーマがあるので、次はそれを定義する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> schema = StructType([
...   StructField('name', StringType(), False),
...   StructField('age', IntegerType(), False),
... ])

RDD とスキーマの両方が揃ったら DataFrame に変換しよう。

>>> df = spark.createDataFrame(rdd, schema)

この通り、ちゃんと DataFrame ができた。

>>> df.show()
+------+---+
|  name|age|
+------+---+
| Alice| 20|
|   Bob| 25|
| Carol| 30|
|Daniel| 30|
+------+---+

ちなみに、ちっこいデータであれば上記のようにする以外にも次のような作り方もある。 実は、これは正に SparkSQL の機能を使っている。

>>> df = spark.sql('SELECT "Hello, World!" AS message')
>>> df
DataFrame[message: string]
>>> df.show()
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+

PySpark のシェルにデフォルトで用意されている spark 変数は SparkSession のインスタンスになっている。 このインスタンスを通して SparkSQL の機能が利用できる。

>>> spark
<pyspark.sql.session.SparkSession object at 0x7fc2959daba8>

SparkSQL で DataFrame を操作する

それでは、先ほど作成した DaraFrame を早速 SparkSQL を使って操作してみる。 それには、まず DataFrame#registerTempTable() というメソッドを使う。 このメソッドを使うと DataFrame を SparkSQL が操作するテーブルとして登録できる。

>>> df.registerTempTable('users')

登録したら、あとは SparkSession#sql() メソッド経由で SQL から先ほどの DataFrame の内容を操作できるようになる。 試しに DataFrame を元に登録したテーブルから全レコードを取り出してみよう。

>>> df = spark.sql('SELECT * FROM users')
>>> df.show()
+------+---+
|  name|age|
+------+---+
| Alice| 20|
|   Bob| 25|
| Carol| 30|
|Daniel| 30|
+------+---+

ばっちり取り出せた。

上記を見て何となく分かるかもしれないけど SparkSQL を使って得られる結果は、やっぱり DataFrame になっている。 ここでは WHERE 句を使って先ほどの DataFrame から絞り込んだ結果を、また新たな DataFrame として取得していることになる。

>>> df = spark.sql('SELECT * FROM users WHERE age < 30')
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 25|
+-----+---+

つまり SparkSQL を使って得た DataFrame をさらに登録して・・・という感じでアドホックに分析を進められる。 さっき取り出した 30 歳未満だけに限定したユーザの DataFrame をテーブルとして登録してみよう。

>>> df.registerTempTable('users_age_under_30')

この通り、ちゃんと登録できた。

>>> df = spark.sql('SELECT * FROM users_age_under_30')
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 25|
+-----+---+

テーブルの揮発性

ちなみに DataFrame#registerTempTable() で登録した内容はメソッド名の通り一次的なものになっている。 そのため PySpark のシェルを終了すると消える。

>>> exit()

一旦シェルを終了してから、再度 PySpark のシェルを立ち上げてテーブルを参照してみることにしよう。

$ pyspark --master yarn

この通り、そんなテーブルはないよと言われる。

>>> spark.sql('SELECT * FROM users').show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/session.py", line 603, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Table or view not found: users; line 1 pos 14'

そのため、何度も使いたいものについては何らかの形で何処かに書き出しておく必要がある。 書き込む先はローカルのディスクなり HDFS なり S3 なり、自分の使っているファイルシステム上になる。

SparkSQL を使わない場合との比較

ちなみに DataFrame は、もちろん SparkSQL を使わずに操作することもできる。 次は SparkSQL を使う場合と使わない場合を軽く比較してみる。

まずは、改めて DataFrame を用意しておく。

>>> users_df = spark.sql('SELECT * FROM users')

PySpark のシェルを終了した場合にはもう一度作り直そう。

SparkSQL を使わずに DataFrame の操作をするときは pyspark.sql.functions を使うことが多いと思う。 とはいえパッケージ名からして sql が入っているんだけど。

>>> from pyspark.sql import functions as F

試しに年齢の平均値を出してみる。 DataFrame#select() で年齢のカラムに pyspark.sql.functions.avg() 関数を適用する。

>>> users_age_avg_df = df.select(F.avg(users_df.age))
>>> users_age_avg_df
DataFrame[avg(age): double]

確認すると、たしかに平均が計算できた。

>>> users_age_avg_df.show()
+--------+
|avg(age)|
+--------+
|   26.25|
+--------+

ちなみに、カラム名を指定するときは Column#alias() メソッドを使う。

>>> users_df.select(F.avg(users_df.age).alias('age_avg')).show()
+-------+
|age_avg|
+-------+
|  26.25|
+-------+

SparkSQL を使うときはこんな感じ。 特に説明は不要だと思う。

>>> spark.sql('SELECT AVG(age) AS age_avg FROM users').show()
+-------+
|age_avg|
+-------+
|  26.25|
+-------+

こうして見ると SparkSQL を使わない場合でも、それに至るまでに扱う API 自体は近いことが分かる。

次は年齢ごとの人数を数えてみよう。 まずは DataFrame#groupBy() メソッドで集約に使うカラムを指定する。

>>> gd = users_df.groupBy(users_df.age)

返ってくるのは GroupedData になる。

>>> gd
<pyspark.sql.group.GroupedData object at 0x1720ed0>

集約した結果をカウントするために GroupedData#count() メソッドを使うと DataFrame が返る。

>>> df = gd.count()
>>> df
DataFrame[age: int, count: bigint]

これで各年齢の人数が分かった。

>>> df.show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 25|    1|
| 30|    2|
+---+-----+

SparkSQL を使うときはこんな感じ。 見慣れた SQL そのままだ。

>>> spark.sql('SELECT age, COUNT(1) AS count FROM users GROUP BY age').show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 25|    1|
| 30|    2|
+---+-----+

まとめ

今回は PySpark の DataFrame オブジェクトを SparkSQL から操作してみた。 データエンジニアリングの世界では、色々なプログラミング言語や環境、そして API が登場する。 そうした中 Apache Spark ではデータ分析における共通言語ともいえる SQL を使ってデータ構造を操作する選択肢があることはありがたい。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集