今回は PySpark の UDF (User Defined Function) 機能を使ってみる。 UDF というのはユーザが定義した関数を使って Spark クラスタで分散処理をするための機能になっている。 柔軟に処理を記述できるメリットがある一方で、パフォーマンスには劣るというデメリットもある。 この特性は、ユーザが定義した処理をワーカーに配布した上で Python インタプリタに解釈させる特性に由来している。 今回は、そんな UDF を DataFrame API と Spark SQL という二つの API を通して使ってみることにした。
使った環境は次の通り。 クラスタは YARN で管理している。
$ pyspark --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/ Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161 Branch Compiled by user felixcheung on 2017-11-24T23:19:45Z Revision Url Type --help for more information.
下準備
まずは UDF で処理する DataFrame オブジェクトを用意しよう。
最初に、ユーザの情報を模した RDD を定義する。
>>> rdd = sc.parallelize([ ... ('Alice', 20), ... ('Bob', 25), ... ('Carol', 30), ... ])
続いて、上記 RDD のスキーマを定義する。
>>> from pyspark.sql.types import StructType >>> from pyspark.sql.types import StructField >>> from pyspark.sql.types import StringType >>> from pyspark.sql.types import IntegerType >>> schema = StructType([ ... StructField('name', StringType(), False), ... StructField('age', IntegerType(), False), ... ])
RDD とスキーマから DataFrame を生成する。
>>> df = spark.createDataFrame(rdd, schema)
また、後ほど Spark SQL から操作する場合のことを考えて、この DataFrame をテーブルとして扱えるようにしておこう。
>>> df.registerTempTable('users')
UDF となる関数を定義する
続いて UDF にする関数を定義する。 これは、何の変哲もない Python の関数でしかない。 引数としてカラムの値を受け取って、何らかの加工したカラムの値を返すことになる。
>>> def double(column): ... return column * 2 ...
今回のサンプルコードでは、受け取った引数を 2 倍にして返すという単純なものにした。
DataFrame API から UDF を使う
まずは DataFrame API で UDF を使う方法から。
これには pyspark.sql.functions.udf()
という関数を使う。
>>> from pyspark.sql.functions import udf
上記の udf()
関数を使って、先ほど定義した関数をラップする。
このラップした udf_double()
関数が UDF として動作する。
>>> udf_double = udf(double)
後は pyspark.sql.functions
にあるような関数と同じような使い勝手で UDF が使える。
例えばサンプルデータに UDF を使って age
カラムの値を 2 倍してみよう。
>>> df.select(udf_double('age')).show() +-----------+ |double(age)| +-----------+ | 40| | 50| | 60| +-----------+
上手くいったようだ。
上記だと、表示がちょっと分かりにくいかもしれないので他のカラムも同時に出力してみる。
カラムには alias()
メソッドを使って名前が付けられる。
>>> df.select('*', udf_double('age').alias('doubled_age')).show() +-----+---+-----------+ | name|age|doubled_age| +-----+---+-----------+ |Alice| 20| 40| | Bob| 25| 50| |Carol| 30| 60| +-----+---+-----------+
Spark SQL から UDF を使う
続いては Spark SQL から UDF を使ってみる。
それには、まず spark.udf.register()
関数を使って定義した関数を UDF として登録する。
>>> spark.udf.register('udf_double', double)
あとは Spark SQL で処理する SQL 文の中で一般的な関数のように使うことができる。
>>> spark.sql(''' ... SELECT ... *, ... udf_double(age) AS doubled_age ... FROM users ... ''').show() +-----+---+-----------+ | name|age|doubled_age| +-----+---+-----------+ |Alice| 20| 40| | Bob| 25| 50| |Carol| 30| 60| +-----+---+-----------+
ばっちり。
めでたしめでたし。
入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム
- 作者: Tomasz Drabas,Denny Lee,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2017/11/22
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (3件) を見る
Sparkによる実践データ解析 ―大規模データのための機械学習事例集
- 作者: Sandy Ryza,Uri Laserson,Sean Owen,Josh Wills,石川有,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2016/01/23
- メディア: 大型本
- この商品を含むブログ (4件) を見る
- 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2015/08/22
- メディア: 大型本
- この商品を含むブログ (4件) を見る