PySpark とたわむれていて、なんかたまにエラーになるなーと思って原因を調べて分かった話。 最初、パーティションの中身は空になる場合があるとは思っていなかったので、結構おどろいた。
使った環境は次の通り。
$ sw_vers ProductName: Mac OS X ProductVersion: 10.14.6 BuildVersion: 18G87 $ pyspark --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.3 /_/ Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121 Branch Compiled by user on 2019-05-01T05:08:38Z Revision Url Type --help for more information. $ python -V Python 3.7.4 $ java -version openjdk version "12.0.1" 2019-04-16 OpenJDK Runtime Environment (build 12.0.1+12) OpenJDK 64-Bit Server VM (build 12.0.1+12, mixed mode, sharing)
下準備
下準備として PySpark をインストールしたら REPL を起動しておく。 今回の検証に関しては分散処理をしないローカルモードでも再現できる。
$ pip install pyspark $ pyspark
サンプルデータを用意する
例えば SparkSession#range()
を使ってサンプルの DataFrame オブジェクトを作る。
>>> df = spark.range(10)
中身は bigint 型の連番が格納されている。
>>> df DataFrame[id: bigint] >>> df.show(truncate=False) +---+ |id | +---+ |0 | |1 | |2 | |3 | |4 | |5 | |6 | |7 | |8 | |9 | +---+
今回使う環境ではこの DataFrame は 4 つのパーティションに分けて処理されることが分かる。 パーティションというのは Apache Spark が内部的に RDD (Resilient Distributed Dataset) を処理する際の分割数を指している。 RDD は Apache Spark の最も低レイヤなデータ表現で、DataFrame も最終的には RDD に変換されて処理される。
>>> df.rdd.getNumPartitions()
4
試しにパーティションに入っている要素の数をカウントしてみることにしよう。 次のような関数を用意する。
>>> def size_of_partition(map_of_rows): ... """パーティションの要素の数を計算する関数""" ... list_of_rows = list(map_of_rows) ... size_of_list = len(list_of_rows) ... return [size_of_list] ...
これを RDD#mapPartitions()
経由で呼び出す。
これでパーティションの中の要素の数をカウントできる。
>>> df.rdd.mapPartitions(size_of_partition).collect() [2, 3, 2, 3]
各パーティションには 2 ないし 3 の要素が入っているようだ。
意図的にパーティションを空にしてみる
続いては、意図的にパーティションの中身をスカスカにするためにパーティションの分割数を増やしてみよう。
先ほど 4
だった分割数を 20
まで増やしてみる。
パーティションの分割数を増やすには RDD#repartition()
が使える。
>>> reparted_rdd = df.rdd.repartition(20) >>> reparted_rdd.getNumPartitions() 20
この状態でパーティションの要素の数をカウントすると、次のようになった。
要素の数として 0
が登場していることから、パーティションによっては中身が空なことが分かる。
>>> reparted_rdd.mapPartitions(size_of_partition).collect() [0, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 0, 0, 0, 0, 0, 0]
意外だったのは、要素数を均すリバランスがされていないこと。
ちなみに先ほど使った RDD#repartition()
は RDD#coalesce()
をオプション shuffle=True
で呼び出した場合と等価なようだ。
オプション shuffle=True
は要素の順序を保持しないことを表している。
>>> df.rdd.coalesce(20, shuffle=True) # df.rdd.repartition(20) と等価
ちなみに、要素の順序を保持したままパーティションを拡張することはできない。
>>> unshuffled_reparted_rdd = df.rdd.coalesce(20, shuffle=False) >>> unshuffled_reparted_rdd.getNumPartitions() 4
オプションの shuffle
が False
だとパーティションの分割数が増えていないことが分かる。
ようするにパーティションの分割数を増やしたいときは、要素の順序が必ず入れ替わると考えた方が良い。 先ほどパーティションを増やした RDD も、確認すると順番が入れ替わっている。
>>> reparted_rdd.map(lambda x: x).collect() [Row(id=0), Row(id=1), Row(id=7), Row(id=8), Row(id=9), Row(id=5), Row(id=6), Row(id=2), Row(id=3), Row(id=4)]
RDD のままでもいいけど、ちょっと分かりにくいかもしれないので DataFrame に直すとこんな感じ。
>>> reparted_rdd.map(lambda x: x).toDF(df.schema).show(truncate=False) +---+ |id | +---+ |0 | |1 | |7 | |8 | |9 | |5 | |6 | |2 | |3 | |4 | +---+
いじょう。
入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム
- 作者: Tomasz Drabas,Denny Lee,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2017/11/22
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (3件) を見る
スマートPythonプログラミング: Pythonのより良い書き方を学ぶ
- 作者: もみじあめ
- 発売日: 2016/03/12
- メディア: Kindle版
- この商品を含むブログ (1件) を見る