CUBE SUGAR CONTAINER

技術系のこと書きます。

Polars と PySpark / スケールアップとスケールアウト

これは Polars Advent Calendar 2023 の 4 日目に対応したエントリです。

qiita.com


Polars と PySpark は操作方法が似ていると言われることがある。 そこで、今回はいくつかの基本的な操作について、実際に両者でコードを比較してみた。 また、それぞれの思想的な違いなどについても私見を述べる。

最初に書いたとおり、これは Polars のアドベントカレンダーのために書かれたエントリになる。 そのため、まずは簡単に PySpark 自体の説明をしておきたい。

PySpark というのは、Apache Spark という分散処理フレームワークを Python から操作するためのインターフェイスになる。 Apache Spark 自体は Scala で書かれているため、py4j というライブラリを使って Python バインディングを提供している。

Polars と比較対象になることからも分かるように、PySpark にもデータフレームの API が存在している。 ただし、データフレームは最初からあったわけではなく、バージョン 1.3 から追加された。 Apache Spark は RDD (Resilient Distributed Dataset; 耐障害性分散データセット) と呼ばれるデータ構造を動作の基本的な単位としている。 そのため、RDD という低レベル API に対する高レベル API としてデータフレームも作られている。

なお、Apache Spark は、あくまで複数台の計算機で構成されたクラスタ (以下、計算機クラスタ) の上で分散処理をするための仕組みに過ぎない。 そのため、計算機クラスタを用意して、それらのリソースを管理する部分は別でやる必要がある。 計算機クラスタのリソースを管理する部分は、次のような選択肢がある 1

  • Standalone Server 2
  • Hadoop YARN
  • Kubernetes

なお、上記の仕組みを使わずに、シングルノード (ローカルモード) で利用することもできる。 そのため、今回のエントリもローカルモードを使って検証していく。 ただし、シングルノードでの利用は、テストを目的とする以外にはほとんどメリットがない。 また、ローカルモードで動作したコードが分散処理させたときに動かないパターンもあるので注意が必要になる 3

もくじ

思想の違いについて

Polars と PySpark の思想の違いは、タイトルにもあるとおりスケールアップとスケールアウトのアプローチで説明できる。

PySpark は、大きなデータであっても複数の計算機が分担しながら処理をする。 それぞれの計算機が担当するのは元のデータの一部分なので、個々の計算機が持つ CPU コアやメモリの上限に影響を受けにくい (スケールアウト)。 一方で、事前に計算機クラスタを用意する必要があったり、分散処理のオーバーヘッド 4 が存在するといったデメリットもある。

Polars は、それ単体では基本的にシングルノードでしか処理ができない。 そのため、大きなデータを扱うときは、相応に CPU コアやメモリをたくさん積んだ計算機を使うことになる (スケールアップ)。 事前の準備が簡単で、処理のオーバーヘッドも少ない一方、処理できるデータの規模については劣る 5

とはいえ、本当に Apache Spark のような分散処理をしなければ対応できないデータが世の中にどれだけあるのか、という話もある。 Polars や DuckDB といった、シングルノードでの分析におけるスケーラビリティを改善するソフトウェアが台頭しつつあるのは、その流れを反映してのことだろう。

そして、Polars と PySpark は「どちらを使うか」という二者択一とは限らない。 たとえば PySpark で記述した分散処理において、個々の計算機で実行する処理の中身が Polars になっている、というパターンもありうる。 特に UDF (User Defined Function) を書くような場面では、分散処理していても個々の計算機の CPU 資源を有効に使えていない、というケースは十分に考えられる。

下準備

前置きが長くなってしまったけど、ここからは実際に Polars と PySpark を使っていく。

使った環境は次のとおり。

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=22.04
DISTRIB_CODENAME=jammy
DISTRIB_DESCRIPTION="Ubuntu 22.04.3 LTS"
$ uname -srm
Linux 5.15.0-89-generic x86_64
$ python -V
Python 3.10.12
$ pip list | egrep "(polars|pyspark|pandas|scikit-learn)"
pandas          2.1.3
polars          0.19.18
pyspark         3.5.0
scikit-learn    1.3.2

まずは必要なパッケージをインストールする。 前述したとおり Apache Spark は Scala で書かれているため、PySpark の動作には Java のランタイムが必要になる。

$ sudo apt-get install python3-venv openjdk-8-jdk

Python の仮想環境を用意してライブラリをインストールする。

$ python3 -m venv venv
$ source venv/bin/activate
(venv) $ pip install "polars[pyarrow]" pyspark pandas scikit-learn

Python のインタプリタを起動する。

(venv) $ python

あらかじめ scikit-learn を使って OpenML から適当なデータセットをダウンロードしておく。 ここでは Diamonds データセットにした。 この段階では Pandas のデータフレームとして読み込んでいる。

>>> from sklearn.datasets import fetch_openml
>>> df_pandas, _ = fetch_openml(
...     "diamonds",
...     version=1,
...     as_frame=True,
...     return_X_y=True,
...     parser="pandas"
... )
>>> df_pandas.head()
   carat      cut color clarity  depth  table     x     y     z
0   0.23    Ideal     E     SI2   61.5   55.0  3.95  3.98  2.43
1   0.21  Premium     E     SI1   59.8   61.0  3.89  3.84  2.31
2   0.23     Good     E     VS1   56.9   65.0  4.05  4.07  2.31
3   0.29  Premium     I     VS2   62.4   58.0  4.20  4.23  2.63
4   0.31     Good     J     SI2   63.3   58.0  4.34  4.35  2.75

そして、Pandas のデータフレームを、それぞれのフレームワークのデータフレームに変換しておこう 6

Polars

Polars では polars.from_pandas() という関数を使うことで、Pandas のデータフレームから Polars のデータフレームに変換できる。

>>> import polars as pl
>>> df_polars = pl.from_pandas(df_pandas)
>>> df_polars
shape: (53940, 9)
┌───────┬───────────┬───────┬─────────┬───┬───────┬──────┬──────┬──────┐
│ carat ┆ cut       ┆ color ┆ clarity ┆ … ┆ table ┆ x    ┆ y    ┆ z    │
│ ---   ┆ ---       ┆ ---   ┆ ---     ┆   ┆ ---   ┆ ---  ┆ ---  ┆ ---  │
│ f64   ┆ cat       ┆ cat   ┆ cat     ┆   ┆ f64   ┆ f64  ┆ f64  ┆ f64  │
╞═══════╪═══════════╪═══════╪═════════╪═══╪═══════╪══════╪══════╪══════╡
│ 0.23  ┆ Ideal     ┆ E     ┆ SI2     ┆ … ┆ 55.03.953.982.43 │
│ 0.21  ┆ Premium   ┆ E     ┆ SI1     ┆ … ┆ 61.03.893.842.31 │
│ 0.23  ┆ Good      ┆ E     ┆ VS1     ┆ … ┆ 65.04.054.072.31 │
│ 0.29  ┆ Premium   ┆ I     ┆ VS2     ┆ … ┆ 58.04.24.232.63 │
│ …     ┆ …         ┆ …     ┆ …       ┆ … ┆ …     ┆ …    ┆ …    ┆ …    │
│ 0.72  ┆ Good      ┆ D     ┆ SI1     ┆ … ┆ 55.05.695.753.61 │
│ 0.7   ┆ Very Good ┆ D     ┆ SI1     ┆ … ┆ 60.05.665.683.56 │
│ 0.86  ┆ Premium   ┆ H     ┆ SI2     ┆ … ┆ 58.06.156.123.74 │
│ 0.75  ┆ Ideal     ┆ D     ┆ SI2     ┆ … ┆ 55.05.835.873.64 │
└───────┴───────────┴───────┴─────────┴───┴───────┴──────┴──────┴──────┘

PySpark

次に PySpark の場合、まずは SparkSession のインスタンスを作成する必要がある。

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()

作成した SparkSession のインスタンスで sparkContext というプロパティを参照しておこう。 すると、master=local[*] という値が確認できる。 これは、前述したシングルノードのローカルモードで動作していることを示している。

>>> spark.sparkContext
<SparkContext master=local[*] appName=pyspark-shell>

PySpark では SparkSession#createDataFrame() というメソッドを使うと Pandas のデータフレームを PySpark のデータフレームに変換できる。

>>> df_spark = spark.createDataFrame(df_pandas)

ただし、得られたデータフレームの変数を参照しても、中身が表示されない。 これは PySpark が遅延評価を操作の基本としている点が関係している。 まだ、この段階では「Pandas のデータフレームを PySpark のデータフレームに変換する」という処理を、これからすることしか決まっていない。 つまり、データフレームの内容は、実際に評価を実行しない限りは得られない 7

>>> df_spark
DataFrame[carat: double, cut: string, color: string, clarity: string, depth: double, table: double, x: double, y: double, z: double]

試しに head() メソッドを実行してみよう。 すると、評価が実行されて Row クラスのインスタンスが入ったリストとして結果が得られる。

>>> df_spark.head(n=5)
[Row(carat=0.23, cut='Ideal', color='E', clarity='SI2', depth=61.5, table=55.0, x=3.95, y=3.98, z=2.43), Row(carat=0.21, cut='Premium', color='E', clarity='SI1', depth=59.8, table=61.0, x=3.89, y=3.84, z=2.31), Row(carat=0.23, cut='Good', color='E', clarity='VS1', depth=56.9, table=65.0, x=4.05, y=4.07, z=2.31), Row(carat=0.29, cut='Premium', color='I', clarity='VS2', depth=62.4, table=58.0, x=4.2, y=4.23, z=2.63), Row(carat=0.31, cut='Good', color='J', clarity='SI2', depth=63.3, table=58.0, x=4.34, y=4.35, z=2.75)]

上記は見にくいので、結果を行単位で処理したいときはまだしも、中身を軽く見たいだけなら普段は show() メソッドを使った方が良いだろう。

>>> df_spark.show(n=5)
+-----+-------+-----+-------+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+----+----+----+
only showing top 5 rows

これで下準備が整った。

特定のカラムを取り出す

まずは特定のカラムを取り出してみよう。

Polars

Polars であれば特定のカラムを取り出すのに select() メソッドを使う。

>>> df_polars.select(["x", "y", "z"])
shape: (53_940, 3)
┌──────┬──────┬──────┐
│ x    ┆ y    ┆ z    │
│ ---  ┆ ---  ┆ ---  │
│ f64  ┆ f64  ┆ f64  │
╞══════╪══════╪══════╡
│ 3.953.982.43 │
│ 3.893.842.31 │
│ 4.054.072.31 │
│ 4.24.232.63 │
│ …    ┆ …    ┆ …    │
│ 5.695.753.61 │
│ 5.665.683.56 │
│ 6.156.123.74 │
│ 5.835.873.64 │
└──────┴──────┴──────┘

上記は単純にカラム名を文字列で指定しているけど、代わりに Expression を使うことも考えられる。

>>> df_polars.select([pl.col("x"), pl.col("y"), pl.col("z")]).head(n=5)
shape: (5, 3)
┌──────┬──────┬──────┐
│ x    ┆ y    ┆ z    │
│ ---  ┆ ---  ┆ ---  │
│ f64  ┆ f64  ┆ f64  │
╞══════╪══════╪══════╡
│ 3.953.982.43 │
│ 3.893.842.31 │
│ 4.054.072.31 │
│ 4.24.232.63 │
│ 4.344.352.75 │
└──────┴──────┴──────┘

PySpark

PySpark の場合も select() メソッドを使うのは変わらない。 ただし、前述したとおり遅延評価が原則なのでメソッドの返り値からそのまま結果を確認することはできない。

>>> df_spark.select(["x", "y", "z"])
DataFrame[x: double, y: double, z: double]

繰り返しになるけど、処理した結果を得るためには評価しなければいけない。

>>> df_spark.select(["x", "y", "z"]).show(n=5)
+----+----+----+
|   x|   y|   z|
+----+----+----+
|3.95|3.98|2.43|
|3.89|3.84|2.31|
|4.05|4.07|2.31|
| 4.2|4.23|2.63|
|4.34|4.35|2.75|
+----+----+----+
only showing top 5 rows

また、PySpark に関しても、文字列でカラムを指定する代わりに Polars の Expression に相当する指定方法がある。

>>> from pyspark.sql import functions as F
>>> df_spark.select([F.col("x"), F.col("y"), F.col("z")]).show(n=5)
+----+----+----+
|   x|   y|   z|
+----+----+----+
|3.95|3.98|2.43|
|3.89|3.84|2.31|
|4.05|4.07|2.31|
| 4.2|4.23|2.63|
|4.34|4.35|2.75|
+----+----+----+
only showing top 5 rows

特定のカラムを追加する

続いてはデータフレームにカラムを追加するパターンを試してみよう。

Polars

Polars であれば with_columns() メソッドを使う。 ここでは xy カラムの内容を足して x_plus_y というカラムを追加した。

>>> df_polars.with_columns((pl.col("x") + pl.col("y")).alias("x_plus_y")).head(n=5)
shape: (5, 10)
┌───────┬─────────┬───────┬─────────┬───┬──────┬──────┬──────┬──────────┐
│ carat ┆ cut     ┆ color ┆ clarity ┆ … ┆ x    ┆ y    ┆ z    ┆ x_plus_y │
│ ---   ┆ ---     ┆ ---   ┆ ---     ┆   ┆ ---  ┆ ---  ┆ ---  ┆ ---      │
│ f64   ┆ cat     ┆ cat   ┆ cat     ┆   ┆ f64  ┆ f64  ┆ f64  ┆ f64      │
╞═══════╪═════════╪═══════╪═════════╪═══╪══════╪══════╪══════╪══════════╡
│ 0.23  ┆ Ideal   ┆ E     ┆ SI2     ┆ … ┆ 3.953.982.437.93     │
│ 0.21  ┆ Premium ┆ E     ┆ SI1     ┆ … ┆ 3.893.842.317.73     │
│ 0.23  ┆ Good    ┆ E     ┆ VS1     ┆ … ┆ 4.054.072.318.12     │
│ 0.29  ┆ Premium ┆ I     ┆ VS2     ┆ … ┆ 4.24.232.638.43     │
│ 0.31  ┆ Good    ┆ J     ┆ SI2     ┆ … ┆ 4.344.352.758.69     │
└───────┴─────────┴───────┴─────────┴───┴──────┴──────┴──────┴──────────┘

PySpark

PySpark は withColumns() というメソッドを使う。 見て分かるとおり PySpark は命名規則にキャメルケースを利用している。 Python は PEP8 に代表されるスネークケースを利用したコーディング規約を採用する場合が多いため違和感を覚えるかもしれない。

>>> df_spark.withColumns({"x_plus_y": F.col("x") + F.col("y")}).show(n=5)
+-----+-------+-----+-------+-----+-----+----+----+----+-----------------+
|carat|    cut|color|clarity|depth|table|   x|   y|   z|         x_plus_y|
+-----+-------+-----+-------+-----+-----+----+----+----+-----------------+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|3.95|3.98|2.43|             7.93|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|3.89|3.84|2.31|             7.73|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|4.05|4.07|2.31|8.120000000000001|
| 0.29|Premium|    I|    VS2| 62.4| 58.0| 4.2|4.23|2.63|             8.43|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|4.34|4.35|2.75|             8.69|
+-----+-------+-----+-------+-----+-----+----+----+----+-----------------+
only showing top 5 rows

条件で行を絞り込む

次に特定の条件で行を絞り込んでみよう。

Polars

Polars の場合は filter() メソッドで実現できる。 以下では、試しにカラット数が 3 以上の行を取り出している。

>>> df_polars.filter(pl.col("carat") > 3).head(n=5)
shape: (5, 9)
┌───────┬─────────┬───────┬─────────┬───┬───────┬──────┬──────┬──────┐
│ carat ┆ cut     ┆ color ┆ clarity ┆ … ┆ table ┆ x    ┆ y    ┆ z    │
│ ---   ┆ ---     ┆ ---   ┆ ---     ┆   ┆ ---   ┆ ---  ┆ ---  ┆ ---  │
│ f64   ┆ cat     ┆ cat   ┆ cat     ┆   ┆ f64   ┆ f64  ┆ f64  ┆ f64  │
╞═══════╪═════════╪═══════╪═════════╪═══╪═══════╪══════╪══════╪══════╡
│ 3.01  ┆ Premium ┆ I     ┆ I1      ┆ … ┆ 58.09.18.975.67 │
│ 3.11  ┆ Fair    ┆ J     ┆ I1      ┆ … ┆ 57.09.159.025.98 │
│ 3.01  ┆ Premium ┆ F     ┆ I1      ┆ … ┆ 56.09.249.135.73 │
│ 3.05  ┆ Premium ┆ E     ┆ I1      ┆ … ┆ 58.09.269.255.66 │
│ 3.02  ┆ Fair    ┆ I     ┆ I1      ┆ … ┆ 56.09.119.025.91 │
└───────┴─────────┴───────┴─────────┴───┴───────┴──────┴──────┴──────┘

PySpark

PySpark も filter() メソッドが使える。 同じ条件で絞り込んだ場合、記述方法もほとんど変わらない。

>>> df_spark.filter(F.col("carat") > 3).show(n=5)
+-----+-------+-----+-------+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+----+----+----+
| 3.01|Premium|    I|     I1| 62.7| 58.0| 9.1|8.97|5.67|
| 3.11|   Fair|    J|     I1| 65.9| 57.0|9.15|9.02|5.98|
| 3.01|Premium|    F|     I1| 62.2| 56.0|9.24|9.13|5.73|
| 3.05|Premium|    E|     I1| 60.9| 58.0|9.26|9.25|5.66|
| 3.02|   Fair|    I|     I1| 65.2| 56.0|9.11|9.02|5.91|
+-----+-------+-----+-------+-----+-----+----+----+----+
only showing top 5 rows

特定の値で集約する

次に特定の値で集約して、それぞれのグループについて要約統計量などを求めてみよう。

Polars

Polars では、group_by() メソッドで集約してから、計算する内容を agg() メソッドなどで指定する。 ここでは試しに color カラムで集約して carat カラムの平均を、それぞれのグループで計算してみよう。

>>> df_polars.group_by("color").agg(pl.mean("carat").alias("color_mean_carat"))
shape: (7, 2)
┌───────┬──────────────────┐
│ color ┆ color_mean_carat │
│ ---   ┆ ---              │
│ cat   ┆ f64              │
╞═══════╪══════════════════╡
│ J     ┆ 1.162137         │
│ G     ┆ 0.77119          │
│ I     ┆ 1.026927         │
│ D     ┆ 0.657795         │
│ F     ┆ 0.736538         │
│ E     ┆ 0.657867         │
│ H     ┆ 0.911799         │
└───────┴──────────────────┘

PySpark

PySpark であっても、やり方はほとんど変わらない。 ただし、メソッド名はキャメルケースなので group_by() ではなく groupBy() になる。

>>> df_spark.groupBy("color").agg(F.mean("carat").alias("color_mean_carat")).show()
+-----+------------------+
|color|  color_mean_carat|
+-----+------------------+
|    F|0.7365384615384617|
|    E|0.6578666938858808|
|    D| 0.657794833948337|
|    J|1.1621367521367514|
|    G|0.7711902231668448|
|    I|1.0269273330874222|
|    H| 0.911799132947978|
+-----+------------------+

まとめ

さて、ここまで Polars と PySpark の基本的な操作方法を簡単に比べてきた。 両者を見比べて、どのように感じられただろうか。 たしかに、操作方法として似通っている部分はあるようだ。

一方で、PySpark については遅延評価が原則となる点が使い勝手として大きく異なっている。 個人的な感想を述べると、遅延評価しか使えないのは結構めんどくさい。 インタラクティブに操作しているときなどは特に、すぐに結果を見せてほしくなる。

これまで、評価のタイミングは決め打ちになっているフレームワークが多かった。 たとえば Pandas は即時評価しか使えず、PySpark は遅延評価しか使えない。 遅延評価は最適化を効かせやすいというメリットがある一方で、インタラクティブな操作にはあまり向いていないように感じる 8。 そうした意味で、即時評価と遅延評価を使い分けることができる Polars は、使いやすさとパフォーマンスのバランスを上手くとった API になっているのではないだろうか。

とはいえ、もしもシングルノードで捌ききれないようなデータに直面したときは、PySpark (Apache Spark) の存在を思い出してもらいたい。 あるいは、選択肢のひとつになるかもしれないので。


  1. 以前は Apache Mesos もサポートされていたがバージョン 3.2 で非推奨になった
  2. Apache Spark が組み込みで提供している計算機クラスタを管理する仕組み
  3. せめてシングルノードでも内部で擬似的に分散処理をするような構成にするのが望ましい (Hadoop YARN Pseudo Distributed Mode など)
  4. PySpark の場合は Scala と Python の間で SerDe のオーバーヘッドもある
  5. Apache Spark であればテラバイトのデータを処理するのも割りと普通という印象がある
  6. CSV ファイルを読み込んでも良かったけど試しやすさを優先した
  7. 遅延評価を積み重ねる処理を Transformation、評価を実行する処理を Action という
  8. Vaex のように遅延評価を原則としていてもインタラクティブな操作で使い勝手を落としにくいように工夫した実装も存在する