CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: pandas でダミー変数を生成する

今回は pandas を使ってダミー変数を生成する方法について書く。 ダミー変数というのは、例えば国籍や性別といった名義尺度の説明変数を数値に変換する手法のこと。 名義尺度は順序関係を持たないので、単純に取りうる値に対して連番を振るようなやり方では上手くいかない。 そこで、特定の状態や状況について「あり・なし」や「Yes/No」を 01 で表現することになる。 具体例については後述する。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.13.3
BuildVersion:   17D102
$ python -V
Python 3.6.4
$ pip list --format=columns | grep pandas
pandas          0.22.0 

下準備

まずは pandas をインストールして Python の REPL を起動する。

$ pip install pandas
$ python

続いてサンプルとなる DataFrame オブジェクトを用意する。 内容としてはユーザ情報を模したものにした。

>>> import pandas as pd
>>> import numpy as np
>>> data = [
...     ('Alice', 10, 'F'),
...     ('Bob', 20, 'M'),
...     ('Carol', 30, 'O'),
...     ('Daniel', 40, 'M'),
...     ('Eric', 50, np.nan),
... ]
>>> columns = ['name', 'age', 'gender']
>>> df = pd.DataFrame(data, columns=columns)

できあがった DataFrame は次の通り。

>>> df
     name  age gender
0   Alice   10      F
1     Bob   20      M
2   Carol   30      O
3  Daniel   40      M
4    Eric   50    NaN

性別からダミー変数を生成する

先ほど確認した通りサンプルの DataFrame には性別が格納されている。 値については M が男性で F が女性、O はその他で NaN は不明を表している。 まずは、この性別からダミー変数を生成してみよう。

pandas には get_dummies() という、そのものずばりな関数が用意されている。 この関数に Series オブジェクト (カラム) を渡せばダミー変数を自動で作ってくれる。

>>> pd.get_dummies(df['gender'])
   F  M  O
0  1  0  0
1  0  1  0
2  0  0  1
3  0  1  0
4  0  0  0

ちゃんとカラムに格納されている内容を元に、それに該当する行だけ 1 になるダミー変数ができた。

生成されたダミー変数は DataFrame オブジェクトになっている。

>>> dummy_df = pd.get_dummies(df['gender'])
>>> type(dummy_df)
<class 'pandas.core.frame.DataFrame'>

生成したダミー変数を元に DataFrame と一緒に使いたいときは、次のようにして連結すれば良い。

>>> pd.concat([df, dummy_df], axis=1)
     name  age gender  F  M  O
0   Alice   10      F  1  0  0
1     Bob   20      M  0  1  0
2   Carol   30      O  0  0  1
3  Daniel   40      M  0  1  0
4    Eric   50    NaN  0  0  0

ところで、先ほどの例でどのダミー変数にもビットが立っていないものがあることに気づいただろうか? 具体的にはインデックス番号が 4 の Eric の行で、性別が NaN になっていた。 get_dummies() 関数はデフォルトでは NaN についてはダミー変数を作らない。 もし作りたいときは dummy_na オプションを True に指定しよう。

>>> pd.get_dummies(df['gender'], dummy_na=True)
   F  M  O  NaN
0  1  0  0    0
1  0  1  0    0
2  0  0  1    0
3  0  1  0    0
4  0  0  0    1

ダミー変数の名前に特定のプレフィックスを付与したいときは prefix オプションを指定する。

>>> pd.get_dummies(df['gender'], prefix='gender')
   gender_F  gender_M  gender_O
0         1         0         0
1         0         1         0
2         0         0         1
3         0         1         0
4         0         0         0

名前についてはダミー変数を生成した後からカラム名を変更しても構わない。

>>> dummy_df.columns = ['gender_f', 'gender_m', 'gender_o']
>>> dummy_df
   gender_f  gender_m  gender_o
0         1         0         0
1         0         1         0
2         0         0         1
3         0         1         0
4         0         0         0

自分でダミー変数を作ってみる

先ほどの例では get_dummies() 関数を使うことでカラムから自動でダミー変数を作る方法を紹介した。 とはいえ get_dummies() 関数を使うより自分で作ったほうが手っ取り早いこともある。

例えば 20 歳以上かを表すダミー変数 adult を作ってみることにしよう。 この場合、次のように年齢のカラムから真偽値を作って整数に変換すると楽にできる。

>>> adult = (df['age'] >= 20).astype(np.int64)

名前さえ直してやれば、ちゃんとお目当てのダミー変数ができている。

>>> adult.name = 'adult'
>>> adult
0    0
1    1
2    1
3    1
4    1
Name: adult, dtype: int64

あとは元の DataFrame と連結するだけ。

>>> pd.concat([df, adult], axis=1)
     name  age gender  adult
0   Alice   10      F      0
1     Bob   20      M      1
2   Carol   30      O      1
3  Daniel   40      M      1
4    Eric   50    NaN      1

ばっちり。

めでたしめでたし。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Python: pandas の DataFrame から不要なカラムを削除する

今回は pandas の DataFrame オブジェクトから不要なカラムを取り除く方法について書く。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.13.3
BuildVersion:   17D102
$ python -V
Python 3.6.4

下準備

まずは環境に pandas をインストールしておく。

$ pip install pandas

続いてサンプルとなる DataFrame を用意する。 今回は、次のようにユーザを模したデータを用意してみた。

>>> import pandas as pd
>>> from datetime import date
>>> columns = ['id', 'name', 'gender', 'birth', 'country']
>>> data = [
...     (1, 'Alice', 'F', date(1980, 1, 1), 'US'),
...     (2, 'Bob', 'M', date(2000, 3, 3), 'UK'),
...     (3, 'Carol', 'M', date(1990, 2, 2), 'AU'),
... ]
>>> df = pd.DataFrame(data, columns=columns)

できあがった DataFrame はこんな感じ。

>>> df
   id   name gender       birth country
0   1  Alice      F  1980-01-01      US
1   2    Bob      M  2000-03-03      UK
2   3  Carol      M  1990-02-02      AU

非破壊的にカラムを取り除く

まずは非破壊的にカラムを取り除く方法から。 非破壊的というのは、元々の DataFrame には変更を加えることがないということ。 つまり、特定のカラムを取り除いた新しい DataFrame オブジェクトを取得することになる。

非破壊的にカラムを取り除くときは DataFrame#drop() メソッドを用いる。 例えば country カラムを取り除いてみよう。

>>> df.drop('country', axis=1)
   id   name gender       birth
0   1  Alice      F  1980-01-01
1   2    Bob      M  2000-03-03
2   3  Carol      M  1990-02-02

うまくいった。

DataFrame#drop() メソッドにはリストを渡して複数のカラム名を指定することもできる。 例えば countrybirth という二つのカラムを取り除いてみよう。

>>> df.drop(['country', 'birth'], axis=1)
   id   name gender
0   1  Alice      F
1   2    Bob      M
2   3  Carol      M

ばっちり。

破壊的にカラムを取り除く

続いては破壊的にカラムを取り除く方法について。 破壊的ということは、つまり元々の DataFrame オブジェクト自体が変更されるということ。

破壊的にカラムを削除するには次のようにする。 具体的には del 文で DataFrame のカラムを指定すれば良い。

>>> del df['country']

確認すると、元々の DataFrame オブジェクトからカラムが削除されていることが分かる。

>>> df
   id   name gender       birth
0   1  Alice      F  1980-01-01
1   2    Bob      M  2000-03-03
2   3  Carol      M  1990-02-02

ちなみに、このやり方では複数のカラムを同時に削除することはできないようだ。

>>> del df[['birth', 'gender']]
...
TypeError: '['birth', 'gender']' is an invalid key

いじょう。

めでたしめでたし。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Python: pandas で欠損値の有無を調べる

今回はかなり小ネタだけど pandas の DataFrame オブジェクト内に欠損値を含むカラムがあるか調べる方法について。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.13.3
BuildVersion:   17D102
$ python -V
Python 3.6.4
$ pip list --format=columns | grep pandas
pandas          0.22.0 

ひとまず環境には pandas と numpy をインストールしておく。

$ pip install pandas numpy

サンプルとなる DataFrame を次のように用意する。 Python が標準で持っている None や pandas の NaT と numpy の NaN を織り交ぜてみた。 ちなみに NaT は "Not a Time" で、NaN は "Not a Number" を表している。

>>> import pandas as pd
>>> import numpy as np
>>> from datetime import date
>>> columns = ['id', 'name', 'gender', 'birth']
>>> data = [
...     (1, 'Alice', None, date(1980, 1, 1)),
...     (2, 'Bob', 'M', pd.NaT),
...     (np.NaN, None, None, date(1990, 2, 2)),
... ]
>>> df = pd.DataFrame(data, columns=columns)

上記で作った DataFrame は、次のような感じで欠損値が所々にある。

>>> df
    id   name gender       birth
0  1.0  Alice   None  1980-01-01
1  2.0    Bob      M         NaT
2  NaN   None   None  1990-02-02

その上で欠損値になっているところを調べるには DataFrame#isnull() メソッドを使うと良い。 上記のそれぞれの欠損値があるところが真偽値の True として表される。

>>> df.isnull()
      id   name  gender  birth
0  False  False    True  False
1  False  False   False   True
2   True   True    True  False

各カラムにどれだけ欠損値が含まれているかを確認するには DataFrame#sum() メソッドを使うと良い。

>>> df.isnull().sum()
id        1
name      1
gender    2
birth     1
dtype: int64

カラムに欠損値があることが分かったら、欠損している行について取り除くなり適切に穴埋めすることになる。

いじょう。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Python: pandas で対応関係を渡して値を変換する

例えばデータセットの中のカラムが文字列型なんかで入っていると、それを数値に直したくなることはよくあると思う。 今回はそれを pandas でやる方法について書く。 結論から先に書くと Series オブジェクトにある map() メソッドを使うと上手くいく。

使った環境は次の通り。

$ python -V
Python 3.6.4
$ pip list --format=columns | grep -i pandas
pandas             0.22.0

まずは pandas をインストールして Python の REPL を起動しておく。

$ pip install pandas
$ python

サンプルになる DataFrame オブジェクトを用意する。 各行には何かの商品のグレードと価格に関する情報が入っているイメージ。

>>> import pandas as pd
>>> data = [
...     (1, 'C', 1000),
...     (2, 'B', 2500),
...     (3, 'A', 5000),
...     (4, 'S', 10000),
... ]
>>> columns = ['id', 'grade', 'price']
>>> df = pd.DataFrame(data, columns=columns)
>>> df
   id grade  price
0   1     C   1000
1   2     B   2500
2   3     A   5000
3   4     S  10000

この中で grade カラムは文字列で情報が格納されているため扱いにくい。 そこで、このカラムに入っている内容を数値に変換したいと考える。

>>> df['grade']
0    C
1    B
2    A
3    S
Name: grade, dtype: object

対応関係については次のように S ~ C に対して数値を割り振ることにする。

>>> grade_mapping = {
...     'C': 1,
...     'B': 2,
...     'A': 3,
...     'S': 4,
... }

次の通り DataFrame オブジェクトの中で各カラムは Series オブジェクトとして管理されている。

>>> type(df['grade'])
<class 'pandas.core.series.Series'>

この場合、次のように変換したいカラムに対して Series#map() メソッドを使うと上手くいく。

>>> df['grade'].map(grade_mapping)
0    1
1    2
2    3
3    4
Name: grade, dtype: int64

変換後のカラムを元の DataFrame の別のカラムと一緒に使いたいときは、次のようにすると良い。 以下のコードでは既存の DataFramegrade_norm という名前で数値に正規化したカラムを追加している。

>>> df.assign(grade_norm = df['grade'].map(grade_mapping))
   id grade  price  grade_norm
0   1     C   1000           1
1   2     B   2500           2
2   3     A   5000           3
3   4     S  10000           4

上記は DataFrame を非破壊的に操作する (新しいオブジェクトを作る) やり方になる。 もし、既存の DataFrame を破壊的に更新したいときは次のように既存のカラムに代入してしまえば良い。

>>> df['grade'] = df['grade'].map(grade_mapping)
>>> df
   id  grade  price
0   1      1   1000
1   2      2   2500
2   3      3   5000
3   4      4  10000

いじょう。

めでたしめでたし。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Python: pandas で特定の値がいくつあるか数える

今回は pandas で特定の値がいくつ数える方法について。 結論から先に書いてしまうと value_counts() という専用のメソッドがあるよ、という話。

使った環境は次の通り。

$ python -V
Python 3.6.4
$ pip list --format=columns | grep -i pandas
pandas             0.22.0

まずは pandas をインストールして Python の REPL を起動しておく。

$ pip install pandas
$ python

続いて、サンプル用のデータフレームを作っておく。 何かのグレードが英字一文字で格納されたデータのようだ。

>>> import pandas as pd
>>> data = [
...     "A",
...     "B",
...     "B",
...     "C",
... ]
>>> columns = ['grade']
>>> df = pd.DataFrame(data, columns=columns)

上記について特定の値がいくつあるか数えるとき、これまでは groupby() してから size() していた。

>>> df.groupby('grade').size()
grade
A    1
B    2
C    1
dtype: int64

もちろん、上記も間違いではないんだけど pandas の Series オブジェクトには value_counts() という専用のメソッドがある。

>>> df['grade'].value_counts()
B    2
A    1
C    1
Name: grade, dtype: int64

得られる結果はどちらも変わらない。 返り値は Series オブジェクトになる。

>>> s = df['grade'].value_counts()
>>> type(s)
<class 'pandas.core.series.Series'>

ちなみに、上記で得られた SeriesDataFrame に変換するには reset_index() メソッドを使うと便利っぽい。

>>> s.reset_index()
  index  grade
0     B      2
1     A      1
2     C      1
>>> type(s.reset_index())
<class 'pandas.core.frame.DataFrame'>

カラム名については name オプションで指定した方が良いかも。

>>> s.reset_index(name='count')
  index  count
0     B      2
1     A      1
2     C      1

いじょう。

めでたしめでたし。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

PySpark のスクリプトファイルで引数を扱う

今回は Apache Spark の Python インターフェースである PySpark について。 PySpark では定型的な作業についてはスクリプトファイル (*.py) にまとめて spark-submit コマンドで実行することになる。 その際に、動作に必要な引数をさばく方法について。 結論から先に書いてしまうと spark-submit コマンドでスクリプトファイルの後ろにアプリケーション用の引数を渡せば良いだけ。

使った環境は次の通り。 Apache Spark は YARN を使って分散環境を構築してある。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch master
Compiled by user sameera on 2018-02-22T19:24:29Z
Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
Url git@github.com:sameeragarwal/spark.git
Type --help for more information.
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar

Apache Spark で PySpark のスクリプトファイルを実行する

とりあえず、まずは引数とかに関係なく PySpark のスクリプトファイルを実行する方法から。 記述する上での注意点として、最も重要なのは pyspark コマンドで起動する REPL とは若干流儀が異なるということ。 具体的には REPL であれば最初から用意されているいくつかのインスタンス変数が用意されていない。 そのためスクリプトファイルの中でそれらを自分で作成しなければいけない。

以下のサンプルコードでは REPL であれば最初から用意されている SparkContextSparkSession のインスタンスを作っている。 そして、それらを作った上で SparkSQL を呼び出すという内容になっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    df = spark.sql('SELECT "Hello, World!" AS message')
    df.show()


if __name__ == '__main__':
    main()

この内容ではアプリケーションの引数がどうのとかは関係ない。

上記を適当な名前で保存して spark-submit コマンドで実行する。 今回はクラスタを YARN で管理しているので --master オプションに yarn を指定している。

$ spark-submit --master yarn example.py
...
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+
...

うまくいけば上記のように SparkSQL で得られた DataFrame の内容がログの中に表示されるはず。

アプリケーションが動作するための引数を扱う

続いてはアプリケーションが動作するのに必要な引数をどうやって渡すかについて。 これについては spark-submit コマンドのヘルプを見れば、それについて書いてある。

$ spark-submit --help 2>&1 | head -n 1
Usage: spark-submit [options] <app jar | python file> [app arguments]

スクリプトファイルの後ろにアプリケーション固有の引数を書けば良いらしい。

上記を確認するために、以下のようなサンプルコードを用意した。 このコードでは Spark 固有の機能は使っておらず、ただ単に sys.argv を出力している。 これによって Python のモジュールに渡されてきた引数の内容を表示できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import sys

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    print('args:', sys.argv)


if __name__ == '__main__':
    main()

上記のスクリプトファイルを spark-submit で実行してみよう。 スクリプトファイルの後ろにはアプリケーションの引数に見立てた文字列をいくつか追加しておく。

$ spark-submit --master yarn example.py foo bar baz
...
args: ['/home/vagrant/example.py', 'foo', 'bar', 'baz']
...

ちゃんと spark-submit 自体の引数とは別に sys.argv に引数が渡っていることがわかる。

アプリケーション固有の引数をパースする

アプリケーションへの引数の渡し方が分かったので、あとはパースしてアプリケーションの中で使うだけ。 次のサンプルコードでは argparse モジュールを使って引数をパースして SparkSQL の中に内容を埋め込んでいる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import argparse

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    parser = argparse.ArgumentParser(description='pyspark app args example')
    parser.add_argument('--message', type=str, required=True, help='show message')
    args = parser.parse_args()

    sql = 'SELECT "{msg}" AS message'.format(msg=args.message)
    df = spark.sql(sql)
    df.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 上手くいけば表示される内容が引数で渡した内容になる。

$ spark-submit --master yarn example.py --message "Hello, PySpark"
...
+--------------+
|       message|
+--------------+
|Hello, PySpark|
+--------------+
...

ばっちり。

めでたしめでたし。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

PySpark の DataFrame を SparkSQL で操作する

Apache Spark には SQL の実行エンジンが組み込まれていて、そのインターフェースは SparkSQL と呼ばれている。 この機能を使うと Spark で主に扱われるデータ構造の DataFrame オブジェクトを SQL で操作できる。 今回は PySpark から DataFrame を SparkSQL で操作する方法について書いてみる。

使った環境は次の通り。 Spark は YARN の上で動作するように環境構築してある。 ただし、今回扱う範囲であれば別にスタンドアロンな環境でも動くはず。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar
$ spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch 
Compiled by user felixcheung on 2017-11-24T23:19:45Z
Revision 
Url 
Type --help for more information.

まずは PySpark のシェルを起動しておこう。

$ pyspark --master yarn

サンプルの DataFrame を用意する

まずは操作対象のサンプルとなる DataFrmae を用意する。 ここでは RDD から作ることにした。 中身はユーザの名前と年齢が記録されたものになる。

>>> rdd = sc.parallelize([
...   ('Alice', 20),
...   ('Bob', 25),
...   ('Carol', 30),
...   ('Daniel', 30),
... ])

これで RDD ができた。

>>> rdd.collect()
[('Alice', 20), ('Bob', 25), ('Carol', 30), ('Daniel', 30)]

DataFrame には RDD と違ってスキーマがあるので、次はそれを定義する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> schema = StructType([
...   StructField('name', StringType(), False),
...   StructField('age', IntegerType(), False),
... ])

RDD とスキーマの両方が揃ったら DataFrame に変換しよう。

>>> df = spark.createDataFrame(rdd, schema)

この通り、ちゃんと DataFrame ができた。

>>> df.show()
+------+---+
|  name|age|
+------+---+
| Alice| 20|
|   Bob| 25|
| Carol| 30|
|Daniel| 30|
+------+---+

ちなみに、ちっこいデータであれば上記のようにする以外にも次のような作り方もある。 実は、これは正に SparkSQL の機能を使っている。

>>> df = spark.sql('SELECT "Hello, World!" AS message')
>>> df
DataFrame[message: string]
>>> df.show()
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+

PySpark のシェルにデフォルトで用意されている spark 変数は SparkSession のインスタンスになっている。 このインスタンスを通して SparkSQL の機能が利用できる。

>>> spark
<pyspark.sql.session.SparkSession object at 0x7fc2959daba8>

SparkSQL で DataFrame を操作する

それでは、先ほど作成した DaraFrame を早速 SparkSQL を使って操作してみる。 それには、まず DataFrame#registerTempTable() というメソッドを使う。 このメソッドを使うと DataFrame を SparkSQL が操作するテーブルとして登録できる。

>>> df.registerTempTable('users')

登録したら、あとは SparkSession#sql() メソッド経由で SQL から先ほどの DataFrame の内容を操作できるようになる。 試しに DataFrame を元に登録したテーブルから全レコードを取り出してみよう。

>>> df = spark.sql('SELECT * FROM users')
>>> df.show()
+------+---+
|  name|age|
+------+---+
| Alice| 20|
|   Bob| 25|
| Carol| 30|
|Daniel| 30|
+------+---+

ばっちり取り出せた。

上記を見て何となく分かるかもしれないけど SparkSQL を使って得られる結果は、やっぱり DataFrame になっている。 ここでは WHERE 句を使って先ほどの DataFrame から絞り込んだ結果を、また新たな DataFrame として取得していることになる。

>>> df = spark.sql('SELECT * FROM users WHERE age < 30')
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 25|
+-----+---+

つまり SparkSQL を使って得た DataFrame をさらに登録して・・・という感じでアドホックに分析を進められる。 さっき取り出した 30 歳未満だけに限定したユーザの DataFrame をテーブルとして登録してみよう。

>>> df.registerTempTable('users_age_under_30')

この通り、ちゃんと登録できた。

>>> df = spark.sql('SELECT * FROM users_age_under_30')
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 25|
+-----+---+

テーブルの揮発性

ちなみに DataFrame#registerTempTable() で登録した内容はメソッド名の通り一次的なものになっている。 そのため PySpark のシェルを終了すると消える。

>>> exit()

一旦シェルを終了してから、再度 PySpark のシェルを立ち上げてテーブルを参照してみることにしよう。

$ pyspark --master yarn

この通り、そんなテーブルはないよと言われる。

>>> spark.sql('SELECT * FROM users').show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/session.py", line 603, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Table or view not found: users; line 1 pos 14'

そのため、何度も使いたいものについては何らかの形で何処かに書き出しておく必要がある。 書き込む先はローカルのディスクなり HDFS なり S3 なり、自分の使っているファイルシステム上になる。

SparkSQL を使わない場合との比較

ちなみに DataFrame は、もちろん SparkSQL を使わずに操作することもできる。 次は SparkSQL を使う場合と使わない場合を軽く比較してみる。

まずは、改めて DataFrame を用意しておく。

>>> users_df = spark.sql('SELECT * FROM users')

PySpark のシェルを終了した場合にはもう一度作り直そう。

SparkSQL を使わずに DataFrame の操作をするときは pyspark.sql.functions を使うことが多いと思う。 とはいえパッケージ名からして sql が入っているんだけど。

>>> from pyspark.sql import functions as F

試しに年齢の平均値を出してみる。 DataFrame#select() で年齢のカラムに pyspark.sql.functions.avg() 関数を適用する。

>>> users_age_avg_df = df.select(F.avg(users_df.age))
>>> users_age_avg_df
DataFrame[avg(age): double]

確認すると、たしかに平均が計算できた。

>>> users_age_avg_df.show()
+--------+
|avg(age)|
+--------+
|   26.25|
+--------+

ちなみに、カラム名を指定するときは Column#alias() メソッドを使う。

>>> users_df.select(F.avg(users_df.age).alias('age_avg')).show()
+-------+
|age_avg|
+-------+
|  26.25|
+-------+

SparkSQL を使うときはこんな感じ。 特に説明は不要だと思う。

>>> spark.sql('SELECT AVG(age) AS age_avg FROM users').show()
+-------+
|age_avg|
+-------+
|  26.25|
+-------+

こうして見ると SparkSQL を使わない場合でも、それに至るまでに扱う API 自体は近いことが分かる。

次は年齢ごとの人数を数えてみよう。 まずは DataFrame#groupBy() メソッドで集約に使うカラムを指定する。

>>> gd = users_df.groupBy(users_df.age)

返ってくるのは GroupedData になる。

>>> gd
<pyspark.sql.group.GroupedData object at 0x1720ed0>

集約した結果をカウントするために GroupedData#count() メソッドを使うと DataFrame が返る。

>>> df = gd.count()
>>> df
DataFrame[age: int, count: bigint]

これで各年齢の人数が分かった。

>>> df.show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 25|    1|
| 30|    2|
+---+-----+

SparkSQL を使うときはこんな感じ。 見慣れた SQL そのままだ。

>>> spark.sql('SELECT age, COUNT(1) AS count FROM users GROUP BY age').show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 25|    1|
| 30|    2|
+---+-----+

まとめ

今回は PySpark の DataFrame オブジェクトを SparkSQL から操作してみた。 データエンジニアリングの世界では、色々なプログラミング言語や環境、そして API が登場する。 そうした中 Apache Spark ではデータ分析における共通言語ともいえる SQL を使ってデータ構造を操作する選択肢があることはありがたい。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集