CUBE SUGAR CONTAINER

技術系のこと書きます。

PySpark の UDF (User Defined Function) を試す

今回は PySpark の UDF (User Defined Function) 機能を使ってみる。 UDF というのはユーザが定義した関数を使って Spark クラスタで分散処理をするための機能になっている。 柔軟に処理を記述できるメリットがある一方で、パフォーマンスには劣るというデメリットもある。 この特性は、ユーザが定義した処理をワーカーに配布した上で Python インタプリタに解釈させる特性に由来している。 今回は、そんな UDF を DataFrame API と Spark SQL という二つの API を通して使ってみることにした。

使った環境は次の通り。 クラスタは YARN で管理している。

$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch 
Compiled by user felixcheung on 2017-11-24T23:19:45Z
Revision 
Url 
Type --help for more information.

下準備

まずは UDF で処理する DataFrame オブジェクトを用意しよう。

最初に、ユーザの情報を模した RDD を定義する。

>>> rdd = sc.parallelize([
...   ('Alice', 20),
...   ('Bob', 25),
...   ('Carol', 30),
... ])

続いて、上記 RDD のスキーマを定義する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> schema = StructType([
...   StructField('name', StringType(), False),
...   StructField('age', IntegerType(), False),
... ])

RDD とスキーマから DataFrame を生成する。

>>> df = spark.createDataFrame(rdd, schema)

また、後ほど Spark SQL から操作する場合のことを考えて、この DataFrame をテーブルとして扱えるようにしておこう。

>>> df.registerTempTable('users')

UDF となる関数を定義する

続いて UDF にする関数を定義する。 これは、何の変哲もない Python の関数でしかない。 引数としてカラムの値を受け取って、何らかの加工したカラムの値を返すことになる。

>>> def double(column):
...     return column * 2
... 

今回のサンプルコードでは、受け取った引数を 2 倍にして返すという単純なものにした。

DataFrame API から UDF を使う

まずは DataFrame API で UDF を使う方法から。 これには pyspark.sql.functions.udf() という関数を使う。

>>> from pyspark.sql.functions import udf

上記の udf() 関数を使って、先ほど定義した関数をラップする。 このラップした udf_double() 関数が UDF として動作する。

>>> udf_double = udf(double)

後は pyspark.sql.functions にあるような関数と同じような使い勝手で UDF が使える。 例えばサンプルデータに UDF を使って age カラムの値を 2 倍してみよう。

>>> df.select(udf_double('age')).show()
+-----------+
|double(age)|
+-----------+
|         40|
|         50|
|         60|
+-----------+

上手くいったようだ。

上記だと、表示がちょっと分かりにくいかもしれないので他のカラムも同時に出力してみる。 カラムには alias() メソッドを使って名前が付けられる。

>>> df.select('*', udf_double('age').alias('doubled_age')).show()
+-----+---+-----------+
| name|age|doubled_age|
+-----+---+-----------+
|Alice| 20|         40|
|  Bob| 25|         50|
|Carol| 30|         60|
+-----+---+-----------+

Spark SQL から UDF を使う

続いては Spark SQL から UDF を使ってみる。

それには、まず spark.udf.register() 関数を使って定義した関数を UDF として登録する。

>>> spark.udf.register('udf_double', double)

あとは Spark SQL で処理する SQL 文の中で一般的な関数のように使うことができる。

>>> spark.sql('''
... SELECT
...   *,
...   udf_double(age) AS doubled_age
... FROM users
... ''').show()
+-----+---+-----------+
| name|age|doubled_age|
+-----+---+-----------+
|Alice| 20|         40|
|  Bob| 25|         50|
|Carol| 30|         60|
+-----+---+-----------+

ばっちり。

めでたしめでたし。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

初めてのSpark

初めてのSpark

  • 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2015/08/22
  • メディア: 大型本
  • この商品を含むブログ (4件) を見る