Apache Spark の Python 版インターフェースである PySpark で DataFrame オブジェクトにカラムを追加する方法について。 いくつかやり方があるので見ていく。 ちなみに DataFrame や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。 そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。
使った環境は次の通り。
$ sw_vers ProductName: Mac OS X ProductVersion: 10.14.6 BuildVersion: 18G87 $ python -V Python 3.7.4 $ pip list | grep -i pyspark pyspark 2.4.3
下準備
まずは PySpark のインタプリタを起動しておく。 今回に関しては分散処理はしないのでローカルモードで構わない。
$ pyspark
サンプルとしてユーザ情報を模したデータを作ってみる。 まずは、次のように RDD (Resilient Distributed Dataset) を作る。
>>> users_rdd = sc.parallelize([ ... ('Alice', 20), ... ('Bob', 25), ... ('Carol', 30), ... ('Daniel', 30), ... ])
上記にスキーマを定義して DataFrame に変換する。
>>> from pyspark.sql.types import StructType >>> from pyspark.sql.types import StructField >>> from pyspark.sql.types import StringType >>> from pyspark.sql.types import IntegerType >>> df_schema = StructType([ ... StructField('name', StringType(), False), ... StructField('age', IntegerType(), False), ... ]) >>> users_df = spark.createDataFrame(users_rdd, df_schema)
上手くいけば、次のようになる。
>>> users_df.show(truncate=False) +------+---+ |name |age| +------+---+ |Alice |20 | |Bob |25 | |Carol |30 | |Daniel|30 | +------+---+
今回はここに、年齢を倍にした double_age
というカラムを追加してみる。
SparkSQL を使ってカラムを追加する
まずは SparkSQL を使ってカラムを追加してみる。
先ほどの DataFrame を SparkSQL から操作できるように登録しておく。
>>> users_df.registerTempTable('users')
あるいは、以下のようにしても良い。
>>> users_df.createOrReplaceTempView('users')
既存のカラムに加えて年齢を倍にしたカラムを追加するように SQL を用意する。
>>> query = """ ... SELECT ... name, ... age, ... age * 2 AS double_age ... FROM users ... """
そして SparkSession#sql()
で実行する。
>>> new_users_df = spark.sql(query)
得られた DataFrame を見ると、ちゃんとカラムが新たに追加されている。
>>> new_users_df.show(truncate=False) +------+---+----------+ |name |age|double_age| +------+---+----------+ |Alice |20 |40 | |Bob |25 |50 | |Carol |30 |60 | |Daniel|30 |60 | +------+---+----------+
DataFrame API を使ってカラムを追加する
DataFrame に生えたメソッドを使ってカラムを追加する方法もある。 見栄えはだいぶ変わるけど、先ほどとやっていることは基本的に変わらない。
>>> new_users_df = users_df.withColumn('double_age', users_df.age * 2)
DataFrame API は、使っていくと「これ SQL 書いてるのと変わらなくね?」ってなってくる。 なので、個人的にはあまり出番がない。
>>> new_users_df.show(truncate=False) +------+---+----------+ |name |age|double_age| +------+---+----------+ |Alice |20 |40 | |Bob |25 |50 | |Carol |30 |60 | |Daniel|30 |60 | +------+---+----------+
RDD API を使ってカラムを追加する
最後に、Apache Spark の最もプリミティブなデータ表現である RDD の API を使って追加する方法について。 ただし、このやり方は UDF (User Defined Function) を使うので遅いはず。
まずは、次のように RDD を行単位で処理してカラムを追加する関数を用意する。
>>> def double_age(row): ... """年齢を倍にしたカラムを追加する関数""" ... return list(row) + [row['age'] * 2] ...
DataFrame の RDD に適用すると、次のようになる。
>>> new_users_rdd = users_df.rdd.map(double_age) >>> new_users_rdd.collect() [['Alice', 20, 40], ['Bob', 25, 50], ['Carol', 30, 60], ['Daniel', 30, 60]]
元の DataFrame に戻したいけど、そのままだとカラム名や型の情報がない。
>>> new_users_rdd.toDF().show(truncate=False) +------+---+---+ |_1 |_2 |_3 | +------+---+---+ |Alice |20 |40 | |Bob |25 |50 | |Carol |30 |60 | |Daniel|30 |60 | +------+---+---+
そこで、元あった DataFrame のスキーマを改変する形で新たな DataFrame のスキーマを定義する。
>>> new_schema_fields = users_df.schema.fields + [StructField('double_age', IntegerType(), False)] >>> new_schema = StructType(new_schema_fields)
用意したスキーマを使って DataFrame に変換する。
>>> new_user_df = new_users_rdd.toDF(new_schema)
これでカラム名や型の情報がちゃんとした DataFrame になった。
>>> new_user_df.show(truncate=False) +------+---+----------+ |name |age|double_age| +------+---+----------+ |Alice |20 |40 | |Bob |25 |50 | |Carol |30 |60 | |Daniel|30 |60 | +------+---+----------+
補足
ちなみにカラムを削除したいときは、次のように DataFrame API で DataFrame#drop()
を呼び出せば良い。
>>> new_user_df.drop('age').show(truncate=False) +------+----------+ |name |double_age| +------+----------+ |Alice |40 | |Bob |50 | |Carol |60 | |Daniel|60 | +------+----------+
いじょう。
入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム
- 作者: Tomasz Drabas,Denny Lee,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2017/11/22
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (3件) を見る
スマートPythonプログラミング: Pythonのより良い書き方を学ぶ
- 作者: もみじあめ
- 発売日: 2016/03/12
- メディア: Kindle版
- この商品を含むブログ (1件) を見る