CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Apache Spark のパーティションは要素が空になるときがある

PySpark とたわむれていて、なんかたまにエラーになるなーと思って原因を調べて分かった話。 最初、パーティションの中身は空になる場合があるとは思っていなかったので、結構おどろいた。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121
Branch
Compiled by user  on 2019-05-01T05:08:38Z
Revision
Url
Type --help for more information.
$ python -V
Python 3.7.4
$ java -version
openjdk version "12.0.1" 2019-04-16
OpenJDK Runtime Environment (build 12.0.1+12)
OpenJDK 64-Bit Server VM (build 12.0.1+12, mixed mode, sharing)

下準備

下準備として PySpark をインストールしたら REPL を起動しておく。 今回の検証に関しては分散処理をしないローカルモードでも再現できる。

$ pip install pyspark
$ pyspark

サンプルデータを用意する

例えば SparkSession#range() を使ってサンプルの DataFrame オブジェクトを作る。

>>> df = spark.range(10)

中身は bigint 型の連番が格納されている。

>>> df
DataFrame[id: bigint]
>>> df.show(truncate=False)
+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+---+

今回使う環境ではこの DataFrame は 4 つのパーティションに分けて処理されることが分かる。 パーティションというのは Apache Spark が内部的に RDD (Resilient Distributed Dataset) を処理する際の分割数を指している。 RDD は Apache Spark の最も低レイヤなデータ表現で、DataFrame も最終的には RDD に変換されて処理される。

>>> df.rdd.getNumPartitions()
4

試しにパーティションに入っている要素の数をカウントしてみることにしよう。 次のような関数を用意する。

>>> def size_of_partition(map_of_rows):
...     """パーティションの要素の数を計算する関数"""
...     list_of_rows = list(map_of_rows)
...     size_of_list = len(list_of_rows)
...     return [size_of_list]
...

これを RDD#mapPartitions() 経由で呼び出す。 これでパーティションの中の要素の数をカウントできる。

>>> df.rdd.mapPartitions(size_of_partition).collect()
[2, 3, 2, 3]

各パーティションには 2 ないし 3 の要素が入っているようだ。

意図的にパーティションを空にしてみる

続いては、意図的にパーティションの中身をスカスカにするためにパーティションの分割数を増やしてみよう。 先ほど 4 だった分割数を 20 まで増やしてみる。 パーティションの分割数を増やすには RDD#repartition() が使える。

>>> reparted_rdd = df.rdd.repartition(20)
>>> reparted_rdd.getNumPartitions()
20

この状態でパーティションの要素の数をカウントすると、次のようになった。 要素の数として 0 が登場していることから、パーティションによっては中身が空なことが分かる。

>>> reparted_rdd.mapPartitions(size_of_partition).collect()
[0, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 0, 0, 0, 0, 0, 0]

意外だったのは、要素数を均すリバランスがされていないこと。

ちなみに先ほど使った RDD#repartition()RDD#coalesce() をオプション shuffle=True で呼び出した場合と等価なようだ。 オプション shuffle=True は要素の順序を保持しないことを表している。

>>> df.rdd.coalesce(20, shuffle=True)  # df.rdd.repartition(20) と等価

ちなみに、要素の順序を保持したままパーティションを拡張することはできない。

>>> unshuffled_reparted_rdd = df.rdd.coalesce(20, shuffle=False)
>>> unshuffled_reparted_rdd.getNumPartitions()
4

オプションの shuffleFalse だとパーティションの分割数が増えていないことが分かる。

ようするにパーティションの分割数を増やしたいときは、要素の順序が必ず入れ替わると考えた方が良い。 先ほどパーティションを増やした RDD も、確認すると順番が入れ替わっている。

>>> reparted_rdd.map(lambda x: x).collect()
[Row(id=0), Row(id=1), Row(id=7), Row(id=8), Row(id=9), Row(id=5), Row(id=6), Row(id=2), Row(id=3), Row(id=4)]

RDD のままでもいいけど、ちょっと分かりにくいかもしれないので DataFrame に直すとこんな感じ。

>>> reparted_rdd.map(lambda x: x).toDF(df.schema).show(truncate=False)
+---+
|id |
+---+
|0  |
|1  |
|7  |
|8  |
|9  |
|5  |
|6  |
|2  |
|3  |
|4  |
+---+

いじょう。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム